Эх сурвалжийг харах

gw: Image subclasses hold track dicts rather than lists.

Keir Fraser 4 жил өмнө
parent
commit
5c5f9455ad

+ 20 - 18
scripts/greaseweazle/image/adf.py

@@ -13,10 +13,9 @@ class ADF(Image):
 
     default_format = 'amiga.amigados'
 
-    def __init__(self, start_cyl, nr_sides):
-        error.check(nr_sides == 2, "ADF: Must be double-sided")
+    def __init__(self):
         self.sec_per_track = 11
-        self.track_list = [None] * start_cyl
+        self.to_track = dict()
 
 
     @classmethod
@@ -25,7 +24,7 @@ class ADF(Image):
         with open(name, "rb") as f:
             dat = f.read()
 
-        adf = cls(0, 2)
+        adf = cls()
 
         nsec = adf.sec_per_track
         error.check((len(dat) % (2*nsec*512)) == 0, "Bad ADF image")
@@ -35,23 +34,24 @@ class ADF(Image):
             nsec *= 2
             adf.sec_per_track = nsec
 
-        for i in range(ncyl*2):
-            ados = amigados.AmigaDOS(tracknr=i, nsec=nsec)
-            ados.set_adf_track(dat[i*nsec*512:(i+1)*nsec*512])
-            adf.track_list.append(ados)
+        for tnr in range(ncyl*2):
+            ados = amigados.AmigaDOS(tracknr=tnr, nsec=nsec)
+            ados.set_adf_track(dat[tnr*nsec*512:(tnr+1)*nsec*512])
+            adf.to_track[tnr] = ados
 
         return adf
 
 
     def get_track(self, cyl, side):
-        off = cyl * 2 + side
-        if off >= len(self.track_list):
+        tnr = cyl * 2 + side
+        if not tnr in self.to_track:
             return None
-        return self.track_list[off].raw_track()
+        return self.to_track[tnr].raw_track()
 
 
-    def append_track(self, track):
-        self.track_list.append(track)
+    def emit_track(self, cyl, side, track):
+        tnr = cyl * 2 + side
+        self.to_track[tnr] = track
 
 
     def get_image(self):
@@ -59,19 +59,21 @@ class ADF(Image):
         tlen = self.sec_per_track * 512
         tdat = bytearray()
 
-        for tracknr in range(len(self.track_list)):
-            t = self.track_list[tracknr]
+        ntracks = max(self.to_track, default=0) + 1
+
+        for tnr in range(ntracks):
+            t = self.to_track[tnr] if tnr in self.to_track else None
             if t is not None and hasattr(t, 'get_adf_track'):
                 tdat += t.get_adf_track()
-            elif tracknr < 160:
+            elif tnr < 160:
                 # Pad empty/damaged tracks.
                 tdat += bytes(tlen)
             else:
                 # Do not extend past 160 tracks unless there is data.
                 break
 
-        if len(self.track_list) < 160:
-            tdat += bytes(tlen * (160 - len(self.track_list)))
+        if ntracks < 160:
+            tdat += bytes(tlen * (160 - ntracks))
 
         return tdat
 

+ 47 - 44
scripts/greaseweazle/image/hfe.py

@@ -14,13 +14,11 @@ from .image import Image
 
 class HFE(Image):
 
-    def __init__(self, start_cyl, nr_sides):
-        self.start_cyl = start_cyl
-        self.nr_sides = nr_sides
+    def __init__(self):
         self.bitrate = 250 # XXX real bitrate?
         # Each track is (bitlen, rawbytes).
         # rawbytes is a bytes() object in little-endian bit order.
-        self.track_list = []
+        self.to_track = dict()
 
 
     @classmethod
@@ -29,21 +27,21 @@ class HFE(Image):
         with open(name, "rb") as f:
             dat = f.read()
 
-        (sig, f_rev, nr_cyls, nr_sides, t_enc, bitrate,
+        (sig, f_rev, n_cyl, n_side, t_enc, bitrate,
          _, _, _, tlut_base) = struct.unpack("<8s4B2H2BH", dat[:20])
         error.check(sig != b"HXCHFEV3", "HFEv3 is not supported")
         error.check(sig == b"HXCPICFE" and f_rev <= 1, "Not a valid HFE file")
-        error.check(0 < nr_cyls, "HFE: Invalid #cyls")
-        error.check(0 < nr_sides < 3, "HFE: Invalid #sides")
+        error.check(0 < n_cyl, "HFE: Invalid #cyls")
+        error.check(0 < n_side < 3, "HFE: Invalid #sides")
         error.check(bitrate != 0, "HFE: Invalid bitrate")
-        
-        hfe = cls(0, nr_sides)
+
+        hfe = cls()
         hfe.bitrate = bitrate
 
-        tlut = dat[tlut_base*512:tlut_base*512+nr_cyls*4]
+        tlut = dat[tlut_base*512:tlut_base*512+n_cyl*4]
         
-        for cyl in range(nr_cyls):
-            for side in range(nr_sides):
+        for cyl in range(n_cyl):
+            for side in range(n_side):
                 offset, length = struct.unpack("<2H", tlut[cyl*4:(cyl+1)*4])
                 todo = length // 2
                 tdat = bytes()
@@ -53,18 +51,15 @@ class HFE(Image):
                     tdat += dat[d_off:d_off+d_nr]
                     todo -= d_nr
                     offset += 1
-                hfe.track_list.append((len(tdat)*8, tdat))
+                hfe.to_track[cyl,side] = (len(tdat)*8, tdat)
 
         return hfe
 
 
     def get_track(self, cyl, side):
-        if side >= self.nr_sides or cyl < self.start_cyl:
-            return None
-        off = cyl * self.nr_sides + side
-        if off >= len(self.track_list):
+        if (cyl,side) not in self.to_track:
             return None
-        bitlen, rawbytes = self.track_list[off]
+        bitlen, rawbytes = self.to_track[cyl,side]
         tdat = bitarray(endian='little')
         tdat.frombytes(rawbytes)
         track = MasterTrack(
@@ -73,22 +68,52 @@ class HFE(Image):
         return track
 
 
-    def append_track(self, track):
+    def emit_track(self, cyl, side, track):
         raw = RawTrack(clock = 5e-4 / self.bitrate, data = track)
         bits, _ = raw.get_revolution(0)
         bits.bytereverse()
-        self.track_list.append((len(bits), bits.tobytes()))
+        self.to_track[cyl,side] = (len(bits), bits.tobytes())
 
 
     def get_image(self):
 
+        n_side = 1
+        n_cyl = max(self.to_track.keys(), default=(0), key=lambda x:x[0])[0]
+        n_cyl += 1
+
+        # We dynamically build the Track-LUT and -Data arrays.
+        tlut = bytearray()
+        tdat = bytearray()
+
+        # Stuff real data into the image.
+        for i in range(n_cyl):
+            s0 = self.to_track[i,0] if (i,0) in self.to_track else None
+            s1 = self.to_track[i,1] if (i,1) in self.to_track else None
+            if s0 is None and s1 is None:
+                # Dummy data for empty cylinders. Assumes 300RPM.
+                nr_bytes = 100 * self.bitrate
+                tlut += struct.pack("<2H", len(tdat)//512 + 2, nr_bytes)
+                tdat += bytes([0x88] * (nr_bytes+0x1ff & ~0x1ff))
+            else:
+                # At least one side of this cylinder is populated.
+                if s1 is not None:
+                    n_side = 2
+                bc = [s0 if s0 is not None else (0,bytes()),
+                      s1 if s1 is not None else (0,bytes())]
+                nr_bytes = max(len(t[1]) for t in bc)
+                nr_blocks = (nr_bytes + 0xff) // 0x100
+                tlut += struct.pack("<2H", len(tdat)//512 + 2, 2 * nr_bytes)
+                for b in range(nr_blocks):
+                    for t in bc:
+                        slice = t[1][b*256:(b+1)*256]
+                        tdat += slice + bytes([0x88] * (256 - len(slice)))
+
         # Construct the image header.
-        n_cyl = self.start_cyl + len(self.track_list) // self.nr_sides
         header = struct.pack("<8s4B2H2BH",
                              b"HXCPICFE",
                              0,
                              n_cyl,
-                             self.nr_sides,
+                             n_side,
                              0xff, # unknown encoding
                              self.bitrate,
                              0,    # rpm (unused)
@@ -96,28 +121,6 @@ class HFE(Image):
                              1,    # rsvd
                              1)    # track list offset
 
-        # We dynamically build the Track-LUT and -Data arrays.
-        tlut = bytearray()
-        tdat = bytearray()
-
-        # Dummy data for unused initial cylinders. Assumes 300RPM.
-        for i in range(self.start_cyl):
-            nr_bytes = 100 * self.bitrate
-            tlut += struct.pack("<2H", len(tdat)//512 + 2, nr_bytes)
-            tdat += bytes([0x88] * (nr_bytes+0x1ff & ~0x1ff))
-
-        # Stuff real data into the image.
-        for i in range(0, len(self.track_list), self.nr_sides):
-            bc = [self.track_list[i],
-                  self.track_list[i+1] if self.nr_sides > 1 else (0,bytes())]
-            nr_bytes = max(len(t[1]) for t in bc)
-            nr_blocks = (nr_bytes + 0xff) // 0x100
-            tlut += struct.pack("<2H", len(tdat)//512 + 2, 2 * nr_bytes)
-            for b in range(nr_blocks):
-                for t in bc:
-                    slice = t[1][b*256:(b+1)*256]
-                    tdat += slice + bytes([0x88] * (256 - len(slice)))
-
         # Pad the header and TLUT to 512-byte blocks.
         header += bytes([0xff] * (0x200 - len(header)))
         tlut += bytes([0xff] * (0x200 - len(tlut)))

+ 2 - 2
scripts/greaseweazle/image/image.py

@@ -34,10 +34,10 @@ class Image:
 
     ## Default .to_file() constructor
     @classmethod
-    def to_file(cls, name, start_cyl, nr_sides):
+    def to_file(cls, name):
         error.check(not cls.read_only,
                     "%s: Cannot create %s image files" % (name, cls.__name__))
-        obj = cls(start_cyl, nr_sides)
+        obj = cls()
         obj.filename = name
         return obj
 

+ 2 - 4
scripts/greaseweazle/image/ipf.py

@@ -103,10 +103,8 @@ class IPF(Image):
 
     read_only = True
 
-    def __init__(self, start_cyl, nr_sides):
+    def __init__(self):
         self.lib = get_libcaps()
-        self.start_cyl = start_cyl
-        self.nr_sides = nr_sides
 
     def __del__(self):
         try:
@@ -140,7 +138,7 @@ class IPF(Image):
     @classmethod
     def from_file(cls, name):
 
-        ipf = cls(0, 0)
+        ipf = cls()
 
         ipf.iid = ipf.lib.CAPSAddImage()
         error.check(ipf.iid >= 0, "Could not create IPF image container")

+ 4 - 12
scripts/greaseweazle/image/kryoflux.py

@@ -41,11 +41,8 @@ class KryoFlux(Image):
 
 
     @classmethod
-    def to_file(cls, name, start_cyl, nr_sides):
-        cls = cls(name)
-        cls.cyl, cls.side = start_cyl, 0
-        cls.nr_sides = nr_sides
-        return cls
+    def to_file(cls, name):
+        return cls(name)
 
     @classmethod
     def from_file(cls, name):
@@ -151,7 +148,7 @@ class KryoFlux(Image):
         return Flux(index_list, flux_list, sck)
 
 
-    def append_track(self, track):
+    def emit_track(self, cyl, side, track):
         """Converts @track into a KryoFlux stream file."""
 
         # Check if we should insert an OOB record for the next index mark.
@@ -213,15 +210,10 @@ class KryoFlux(Image):
         dat += struct.pack('<2BH2I', Op.OOB, OOB.StreamEnd, 8, stream_idx, 0)
         dat += struct.pack('<2BH', Op.OOB, OOB.EOF, 0x0d0d)
 
-        name = self.basename + '%02d.%d.raw' % (self.cyl, self.side)
+        name = self.basename + '%02d.%d.raw' % (cyl, side)
         with open(name, 'wb') as f:
                 f.write(dat)
 
-        self.side += 1
-        if self.side >= self.nr_sides:
-            self.side = 0
-            self.cyl += 1
-        
 
     def __enter__(self):
         return self

+ 50 - 46
scripts/greaseweazle/image/scp.py

@@ -23,12 +23,16 @@ class SCP(Image):
     # 40MHz
     sample_freq = 40000000
 
-    def __init__(self, start_cyl, nr_sides):
+    def __init__(self):
         self.opts = SCPOpts()
-        self.nr_sides = nr_sides
         self.nr_revs = None
-        self.track_list = [(None,None)] * (start_cyl*2)
+        self.to_track = dict()
 
+    def side_count(self):
+        s = [0,0] # non-empty tracks on each side
+        for tnr in self.to_track:
+            s[tnr&1] += 1
+        return s
 
     @classmethod
     def from_file(cls, name):
@@ -58,14 +62,13 @@ class SCP(Image):
             error.check(off >= 0, "SCP: Bad Track Table")
             trk_offs = trk_offs[:off]
 
-        scp = cls(0, 2)
+        scp = cls()
         scp.nr_revs = nr_revs
 
         for trknr in range(len(trk_offs)):
             
             trk_off = trk_offs[trknr]
             if trk_off == 0:
-                scp.track_list.append((None, None))
                 continue
 
             # Parse the SCP track header and extract the flux data.
@@ -81,41 +84,30 @@ class SCP(Image):
             if s_off == e_off:
                 # FluxEngine creates dummy TDHs for empty tracks.
                 # Bail on them here.
-                scp.track_list.append((None, None))
                 continue
-                
+
             tdat = dat[trk_off+s_off:trk_off+e_off]
-            scp.track_list.append((thdr[4:], tdat))
+            scp.to_track[trknr] = (thdr[4:], tdat)
+
 
-        # s[side] is True iff there are non-empty tracks on @side
-        s = []
-        for i in range(2):
-            s.append(functools.reduce(lambda x, y: x or (y[1] is not None),
-                                      scp.track_list[i::2], False))
-            
         # Some tools produce (or used to produce) single-sided images using
         # consecutive entries in the TLUT. This needs fixing up.
-        if single_sided and functools.reduce(lambda x, y: x and y, s):
-            new_list = []
-            for t in scp.track_list[:84]:
-                if single_sided != 1: # Side 1
-                    new_list.append((None, None))
-                new_list.append(t)
-                if single_sided == 1: # Side 0
-                    new_list.append((None, None))
-            scp.track_list = new_list
+        s = scp.side_count()
+        if single_sided and s[0] and s[1]:
+            new_dict = dict()
+            for tnr in scp.to_track:
+                new_dict[tnr*2+single_sided-1] = scp.to_track[tnr]
+            scp.to_track = new_dict
             print('SCP: Imported legacy single-sided image')
             
         return scp
 
 
     def get_track(self, cyl, side):
-        off = cyl*2 + side
-        if off >= len(self.track_list):
-            return None
-        tdh, dat = self.track_list[off]
-        if dat is None:
+        tracknr = cyl * 2 + side
+        if not tracknr in self.to_track:
             return None
+        tdh, dat = self.to_track[tracknr]
 
         index_list = []
         while tdh:
@@ -137,16 +129,11 @@ class SCP(Image):
         return Flux(index_list, flux_list, SCP.sample_freq)
 
 
-    def append_track(self, track):
+    def emit_track(self, cyl, side, track):
         """Converts @track into a Supercard Pro Track and appends it to
         the current image-in-progress.
         """
 
-        def _append(self, tdh, dat):
-            self.track_list.append((tdh, dat))
-            if self.nr_sides == 1:
-                self.track_list.append((None, None))
-
         flux = track.flux()
 
         nr_revs = len(flux.index_list)
@@ -176,7 +163,7 @@ class SCP(Image):
                 rev += 1
                 if rev >= nr_revs:
                     # We're done: We simply discard any surplus flux samples
-                    _append(self, tdh, dat)
+                    self.to_track[cyl*2+side] = (tdh, dat)
                     return
                 to_index += flux.index_list[rev]
 
@@ -203,44 +190,61 @@ class SCP(Image):
             len_at_index = len(dat)
             rev += 1
 
-        _append(self, tdh, dat)
+        self.to_track[cyl*2+side] = (tdh, dat)
 
 
     def get_image(self):
-        single_sided = 1 if self.nr_sides == 1 else 0
-        track_list = self.track_list
+
+        # Work out the single-sided byte code
+        s = self.side_count()
+        if s[0] and s[1]:
+            single_sided = 0
+        elif s[0]:
+            single_sided = 1
+        else:
+            single_sided = 2
+
+        to_track = self.to_track
         if single_sided and self.opts.legacy_ss:
             print('SCP: Generated legacy single-sided image')
-            track_list = track_list[::2]
+            to_track = dict()
+            for tnr in self.to_track:
+                to_track[tnr//2] = self.to_track[tnr]
+
+        ntracks = max(to_track, default=0) + 1
+
         # Generate the TLUT and concatenate all the tracks together.
         trk_offs = bytearray()
         trk_dat = bytearray()
-        for trknr in range(len(track_list)):
-            tdh, dat = track_list[trknr]
-            if dat is None:
-                trk_offs += struct.pack("<I", 0)
-            else:
+        for tnr in range(ntracks):
+            if tnr in to_track:
+                tdh, dat = to_track[tnr]
                 trk_offs += struct.pack("<I", 0x2b0 + len(trk_dat))
-                trk_dat += struct.pack("<3sB", b"TRK", trknr) + tdh + dat
+                trk_dat += struct.pack("<3sB", b"TRK", tnr) + tdh + dat
+            else:
+                trk_offs += struct.pack("<I", 0)
         error.check(len(trk_offs) <= 0x2a0, "SCP: Too many tracks")
         trk_offs += bytes(0x2a0 - len(trk_offs))
+
         # Calculate checksum over all data (except 16-byte image header).
         csum = 0
         for x in trk_offs:
             csum += x
         for x in trk_dat:
             csum += x
+
         # Generate the image header.
         header = struct.pack("<3s9BI",
                              b"SCP",    # Signature
                              0,         # Version
                              0x80,      # DiskType = Other
-                             self.nr_revs, 0, len(track_list) - 1,
+                             self.nr_revs, 0, ntracks-1,
                              0x03,      # Flags = Index, 96TPI
                              0,         # 16-bit cell width
                              single_sided,
                              0,         # 25ns capture
                              csum & 0xffffffff)
+
         # Concatenate it all together and send it back.
         return header + trk_offs + trk_dat
 

+ 2 - 2
scripts/greaseweazle/tools/read.py

@@ -19,7 +19,7 @@ from greaseweazle.flux import Flux
 
 
 def open_image(args, image_class):
-    image = image_class.to_file(args.file, args.scyl, args.nr_sides)
+    image = image_class.to_file(args.file)
     if args.rate is not None:
         image.bitrate = args.rate
     for opt, val in args.file_opts.items():
@@ -112,7 +112,7 @@ def read_to_image(usb, args, image, decoder=None):
             dat = read_with_retry(usb, args, cyl, side, decoder)
             print("T%u.%u: %s" % (cyl, side, dat.summary_string()))
             summary[side].append(dat)
-            image.append_track(dat)
+            image.emit_track(cyl, side, dat)
 
     if decoder is not None:
         print_summary(args, summary)

+ 8 - 0
scripts/misc/sw_test.sh

@@ -39,6 +39,14 @@ disk-analyse a.adf a.hfe
 disk-analyse -e 2 b.hfe b.adf
 diff a.adf b.adf
 md5sum a.adf b.adf
+
+# Read Kryoflux
+mkdir a
+./gw --bt read --revs=1 --ecyl=2 a/
+disk-analyse -e 2 a/ b.adf
+diff a.adf b.adf
+md5sum a.adf b.adf
 rm -f b.adf c.adf a.hfe b.hfe
+rm -rf a
 
 rm -f a.adf