https://vimsky.com/examples/detail/python-ex-rangelib-RangeSet---class.html
Python rangelib.RangeSet类代码示例
本文整理汇总了Python中rangelib.RangeSet类的典型用法代码示例。如果您正苦于以下问题:Python RangeSet类的具体用法?Python RangeSet怎么用?Python RangeSet使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了RangeSet类的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_CanUseImgdiff_ineligible
def test_CanUseImgdiff_ineligible(self):
# Disabled by caller.
block_image_diff = BlockImageDiff(EmptyImage(), EmptyImage(),
disable_imgdiff=True)
self.assertFalse(
block_image_diff.CanUseImgdiff(
"/system/app/app1.apk", RangeSet("10-15"), RangeSet("0-5")))
# Unsupported file type.
block_image_diff = BlockImageDiff(EmptyImage(), EmptyImage())
self.assertFalse(
block_image_diff.CanUseImgdiff(
"/system/bin/gzip", RangeSet("10-15"), RangeSet("0-5")))
# At least one of the ranges is in non-monotonic order.
self.assertFalse(
block_image_diff.CanUseImgdiff(
"/system/app/app2.apk", RangeSet("10-15"),
RangeSet("15-20 30 10-14")))
# At least one of the ranges is incomplete.
src_ranges = RangeSet("0-5")
src_ranges.extra['incomplete'] = True
self.assertFalse(
block_image_diff.CanUseImgdiff(
"/vendor/app/app4.apk", RangeSet("10-15"), src_ranges))
# The stats are correctly logged.
self.assertDictEqual(
{
ImgdiffStats.SKIPPED_NONMONOTONIC: {'/system/app/app2.apk'},
ImgdiffStats.SKIPPED_INCOMPLETE: {'/vendor/app/app4.apk'},
},
block_image_diff.imgdiff_stats.stats)
开发者ID:rcstar6696,项目名称:platform_build,代码行数:34,代码来源:test_blockimgdiff.py
示例2: AssertSequenceGood
def AssertSequenceGood(self):
# Simulate the sequences of transfers we will output, and check that:
# - we never read a block after writing it, and
# - we write every block we care about exactly once.
# Start with no blocks having been touched yet.
touched = RangeSet()
# Imagine processing the transfers in order.
for xf in self.transfers:
# Check that the input blocks for this transfer haven't yet been touched.
x = xf.src_ranges
if self.version >= 2:
for _, sr in xf.use_stash:
x = x.subtract(sr)
assert not touched.overlaps(x)
# Check that the output blocks for this transfer haven't yet been touched.
assert not touched.overlaps(xf.tgt_ranges)
# Touch all the blocks written by this transfer.
touched = touched.union(xf.tgt_ranges)
# Check that we've written every target block.
assert touched == self.tgt.care_map
开发者ID:Aaahh,项目名称:android-build,代码行数:25,代码来源:blockimgdiff.py
示例3: AssertPartition
def AssertPartition(total, seq):
"""Assert that all the RangeSets in 'seq' form a partition of the
'total' RangeSet (ie, they are nonintersecting and their union
equals 'total')."""
so_far = RangeSet()
for i in seq:
assert not so_far.overlaps(i)
so_far = so_far.union(i)
assert so_far == total
开发者ID:Aaahh,项目名称:android-build,代码行数:9,代码来源:blockimgdiff.py
示例4: test_ValidateFileConsistency_incompleteRange
def test_ValidateFileConsistency_incompleteRange(self):
input_tmp = common.MakeTempDir()
os.mkdir(os.path.join(input_tmp, 'IMAGES'))
system_image = os.path.join(input_tmp, 'IMAGES', 'system.img')
system_root = os.path.join(input_tmp, "SYSTEM")
os.mkdir(system_root)
# Write the test file that contain multiple blocks of zeros, and these
# zero blocks will be omitted by kernel. And the test files will occupy one
# block range each in the final system image.
with open(os.path.join(system_root, 'a'), 'w') as f:
f.write("aaa")
f.write('\0' * 4096 * 3)
with open(os.path.join(system_root, 'b'), 'w') as f:
f.write("bbb")
f.write('\0' * 4096 * 3)
raw_file_map = os.path.join(input_tmp, 'IMAGES', 'raw_system.map')
self._generate_system_image(system_image, system_root, raw_file_map)
# Parse the generated file map and update the block ranges for each file.
file_map_list = {}
image_ranges = RangeSet()
with open(raw_file_map, 'r') as f:
for line in f.readlines():
info = line.split()
self.assertEqual(2, len(info))
image_ranges = image_ranges.union(RangeSet(info[1]))
file_map_list[info[0]] = RangeSet(info[1])
# Add one unoccupied block as the shared block for all test files.
mock_shared_block = RangeSet("10-20").subtract(image_ranges).first(1)
with open(os.path.join(input_tmp, 'IMAGES', 'system.map'), 'w') as f:
for key in sorted(file_map_list.keys()):
line = "{} {}\n".format(
key, file_map_list[key].union(mock_shared_block))
f.write(line)
# Prepare for the target zip file
input_file = common.MakeTempFile()
all_entries = ['SYSTEM/', 'SYSTEM/b', 'SYSTEM/a', 'IMAGES/',
'IMAGES/system.map', 'IMAGES/system.img']
with zipfile.ZipFile(input_file, 'w') as input_zip:
for name in all_entries:
input_zip.write(os.path.join(input_tmp, name), arcname=name)
input_zip = zipfile.ZipFile(input_file, 'r')
info_dict = {'extfs_sparse_flag': '-s'}
# Expect the validation to pass and both files are skipped due to
# 'incomplete' block range.
ValidateFileConsistency(input_zip, input_tmp, info_dict)
开发者ID:android,项目名称:platform_build,代码行数:52,代码来源:test_validate_target_files.py
示例5: test_parse_raw
def test_parse_raw(self):
self.assertEqual(
RangeSet.parse_raw(RangeSet("0-9").to_string_raw()),
RangeSet("0-9"))
self.assertEqual(RangeSet.parse_raw(
RangeSet("2-10 12").to_string_raw()),
RangeSet("2-10 12"))
self.assertEqual(
RangeSet.parse_raw(RangeSet("11 2-10 12 1 0").to_string_raw()),
RangeSet("11 2-10 12 1 0"))
with self.assertRaises(AssertionError):
RangeSet.parse_raw("4,0,10")
开发者ID:shkschneider,项目名称:android_build,代码行数:13,代码来源:test_rangelib.py
示例6: __init__
def __init__(self):
self.blocksize = 4096
self.care_map = RangeSet()
self.clobbered_blocks = RangeSet()
self.extended = RangeSet()
self.total_blocks = 0
self.file_map = {}
开发者ID:vicamo,项目名称:aosp_platform_build,代码行数:7,代码来源:blockimgdiff.py
示例7: EmptyImage
class EmptyImage(Image):
"""A zero-length image."""
def __init__(self):
self.blocksize = 4096
self.care_map = RangeSet()
self.clobbered_blocks = RangeSet()
self.extended = RangeSet()
self.total_blocks = 0
self.file_map = {}
def RangeSha1(self, ranges):
return sha1().hexdigest()
def ReadRangeSet(self, ranges):
return ()
def TotalSha1(self, include_clobbered_blocks=False):
# EmptyImage always carries empty clobbered_blocks, so
# include_clobbered_blocks can be ignored.
assert self.clobbered_blocks.size() == 0
return sha1().hexdigest()
def WriteRangeDataToFd(self, ranges, fd):
raise ValueError("Can't write data from EmptyImage to file")
开发者ID:vicamo,项目名称:aosp_platform_build,代码行数:25,代码来源:blockimgdiff.py
示例8: __init__
def __init__(self, tgt, src=None, threads=None, version=4,
disable_imgdiff=False):
if threads is None:
threads = multiprocessing.cpu_count() // 2
if threads == 0:
threads = 1
self.threads = threads
self.version = version
self.transfers = []
self.src_basenames = {}
self.src_numpatterns = {}
self._max_stashed_size = 0
self.touched_src_ranges = RangeSet()
self.touched_src_sha1 = None
self.disable_imgdiff = disable_imgdiff
assert version in (1, 2, 3, 4)
self.tgt = tgt
if src is None:
src = EmptyImage()
self.src = src
# The updater code that installs the patch always uses 4k blocks.
assert tgt.blocksize == 4096
assert src.blocksize == 4096
# The range sets in each filemap should comprise a partition of
# the care map.
self.AssertPartition(src.care_map, src.file_map.values())
self.AssertPartition(tgt.care_map, tgt.file_map.values())
开发者ID:ED6E0F17,项目名称:platform_build,代码行数:31,代码来源:blockimgdiff.py
示例9: DataImage
class DataImage(Image):
"""An image wrapped around a single string of data."""
def __init__(self, data, trim=False, pad=False):
self.data = data
self.blocksize = 4096
assert not (trim and pad)
partial = len(self.data) % self.blocksize
if partial > 0:
if trim:
self.data = self.data[:-partial]
elif pad:
self.data += '\0' * (self.blocksize - partial)
else:
raise ValueError(("data for DataImage must be multiple of %d bytes "
"unless trim or pad is specified") %
(self.blocksize,))
assert len(self.data) % self.blocksize == 0
self.total_blocks = len(self.data) / self.blocksize
self.care_map = RangeSet(data=(0, self.total_blocks))
self.clobbered_blocks = RangeSet()
self.extended = RangeSet()
zero_blocks = []
nonzero_blocks = []
reference = '\0' * self.blocksize
for i in range(self.total_blocks):
d = self.data[i*self.blocksize : (i+1)*self.blocksize]
if d == reference:
zero_blocks.append(i)
zero_blocks.append(i+1)
else:
nonzero_blocks.append(i)
nonzero_blocks.append(i+1)
self.file_map = {"__ZERO": RangeSet(zero_blocks),
"__NONZERO": RangeSet(nonzero_blocks)}
def ReadRangeSet(self, ranges):
return [self.data[s*self.blocksize:e*self.blocksize] for (s, e) in ranges]
def TotalSha1(self, include_clobbered_blocks=False):
# DataImage always carries empty clobbered_blocks, so
# include_clobbered_blocks can be ignored.
assert self.clobbered_blocks.size() == 0
return sha1(self.data).hexdigest()
开发者ID:nrichard7029,项目名称:platform_build,代码行数:51,代码来源:blockimgdiff.py
示例10: __init__
def __init__(self, data, trim=False, pad=False):
self.data = data
self.blocksize = 4096
assert not (trim and pad)
partial = len(self.data) % self.blocksize
if partial > 0:
if trim:
self.data = self.data[:-partial]
elif pad:
self.data += '\0' * (self.blocksize - partial)
else:
raise ValueError(("data for DataImage must be multiple of %d bytes "
"unless trim or pad is specified") %
(self.blocksize,))
assert len(self.data) % self.blocksize == 0
self.total_blocks = len(self.data) / self.blocksize
self.care_map = RangeSet(data=(0, self.total_blocks))
self.clobbered_blocks = RangeSet()
self.extended = RangeSet()
zero_blocks = []
nonzero_blocks = []
reference = '\0' * self.blocksize
for i in range(self.total_blocks):
d = self.data[i*self.blocksize : (i+1)*self.blocksize]
if d == reference:
zero_blocks.append(i)
zero_blocks.append(i+1)
else:
nonzero_blocks.append(i)
nonzero_blocks.append(i+1)
self.file_map = {"__ZERO": RangeSet(zero_blocks),
"__NONZERO": RangeSet(nonzero_blocks)}
开发者ID:nrichard7029,项目名称:platform_build,代码行数:39,代码来源:blockimgdiff.py
示例11: WriteTransfers
#.........这里部分代码省略.........
mapped_stashes = []
for s, sr in xf.use_stash:
# TODO: We don't need 'sid' (nor free_stash_ids) in BBOTA v3+.
sid = stashes.pop(s)
unstashed_src_ranges = unstashed_src_ranges.subtract(sr)
sh = self.HashBlocks(self.src, sr)
sr = xf.src_ranges.map_within(sr)
mapped_stashes.append(sr)
if self.version == 2:
src_str.append("%d:%s" % (sid, sr.to_string_raw()))
# A stash will be used only once. We need to free the stash
# immediately after the use, instead of waiting for the automatic
# clean-up at the end. Because otherwise it may take up extra space
# and lead to OTA failures.
# Bug: 23119955
free_string.append("free %d\n" % (sid,))
free_size += sr.size()
else:
assert sh in stashes
src_str.append("%s:%s" % (sh, sr.to_string_raw()))
stashes[sh] -= 1
if stashes[sh] == 0:
free_size += sr.size()
free_string.append("free %s\n" % (sh,))
stashes.pop(sh)
heapq.heappush(free_stash_ids, sid)
if unstashed_src_ranges:
src_str.insert(1, unstashed_src_ranges.to_string_raw())
if xf.use_stash:
mapped_unstashed = xf.src_ranges.map_within(unstashed_src_ranges)
src_str.insert(2, mapped_unstashed.to_string_raw())
mapped_stashes.append(mapped_unstashed)
self.AssertPartition(RangeSet(data=(0, size)), mapped_stashes)
else:
src_str.insert(1, "-")
self.AssertPartition(RangeSet(data=(0, size)), mapped_stashes)
src_str = " ".join(src_str)
# all versions:
# zero <rangeset>
# new <rangeset>
# erase <rangeset>
#
# version 1:
# bsdiff patchstart patchlen <src rangeset> <tgt rangeset>
# imgdiff patchstart patchlen <src rangeset> <tgt rangeset>
# move <src rangeset> <tgt rangeset>
#
# version 2:
# bsdiff patchstart patchlen <tgt rangeset> <src_str>
# imgdiff patchstart patchlen <tgt rangeset> <src_str>
# move <tgt rangeset> <src_str>
#
# version 3:
# bsdiff patchstart patchlen srchash tgthash <tgt rangeset> <src_str>
# imgdiff patchstart patchlen srchash tgthash <tgt rangeset> <src_str>
# move hash <tgt rangeset> <src_str>
tgt_size = xf.tgt_ranges.size()
if xf.style == "new":
assert xf.tgt_ranges
assert tgt_size == WriteSplitTransfers(out, xf.style, xf.tgt_ranges)
total += tgt_size
开发者ID:ED6E0F17,项目名称:platform_build,代码行数:67,代码来源:blockimgdiff.py
示例12: ParseTransferList
def ParseTransferList(self, name):
"""Simulate the transfer commands and calculate the amout of I/O."""
logging.info("\nSimulating commands in '{}':".format(name))
lines = self.package.read(name).strip().splitlines()
assert len(lines) >= 4, "{} is too short; Transfer list expects at least" \
"4 lines, it has {}".format(name, len(lines))
assert int(lines[0]) >= 3
logging.info("(version: {})".format(lines[0]))
blocks_written = 0
my_stash = Stash()
for line in lines[4:]:
cmd_list = line.strip().split(" ")
cmd_name = cmd_list[0]
try:
if cmd_name == "new" or cmd_name == "zero":
assert len(cmd_list) == 2, "command format error: {}".format(line)
target_range = RangeSet.parse_raw(cmd_list[1])
blocks_written += target_range.size()
elif cmd_name == "move":
# Example: move <onehash> <tgt_range> <src_blk_count> <src_range>
# [<loc_range> <stashed_blocks>]
assert len(cmd_list) >= 5, "command format error: {}".format(line)
target_range = RangeSet.parse_raw(cmd_list[2])
blocks_written += target_range.size()
if cmd_list[4] == '-':
continue
SHA1 = cmd_list[1]
source_range = RangeSet.parse_raw(cmd_list[4])
if target_range.overlaps(source_range):
my_stash.HandleOverlapBlocks(SHA1, source_range)
elif cmd_name == "bsdiff" or cmd_name == "imgdiff":
# Example: bsdiff <offset> <len> <src_hash> <tgt_hash> <tgt_range>
# <src_blk_count> <src_range> [<loc_range> <stashed_blocks>]
assert len(cmd_list) >= 8, "command format error: {}".format(line)
target_range = RangeSet.parse_raw(cmd_list[5])
blocks_written += target_range.size()
if cmd_list[7] == '-':
continue
source_SHA1 = cmd_list[3]
source_range = RangeSet.parse_raw(cmd_list[7])
if target_range.overlaps(source_range):
my_stash.HandleOverlapBlocks(source_SHA1, source_range)
elif cmd_name == "stash":
assert len(cmd_list) == 3, "command format error: {}".format(line)
SHA1 = cmd_list[1]
source_range = RangeSet.parse_raw(cmd_list[2])
my_stash.StashBlocks(SHA1, source_range)
elif cmd_name == "free":
assert len(cmd_list) == 2, "command format error: {}".format(line)
SHA1 = cmd_list[1]
my_stash.FreeBlocks(SHA1)
except:
logging.error("failed to parse command in: " + line)
raise
self.block_written += blocks_written
self.block_stashed += my_stash.blocks_stashed
logging.info("blocks written: {} (expected: {})".format(
blocks_written, lines[1]))
logging.info("max blocks stashed simultaneously: {} (expected: {})".
format(my_stash.max_stash_needed, lines[3]))
logging.info("total blocks stashed: {}".format(my_stash.blocks_stashed))
logging.info("blocks stashed implicitly: {}".format(
my_stash.overlap_blocks_stashed))
开发者ID:MIPS,项目名称:build,代码行数:67,代码来源:ota_package_parser.py
示例13: BlockImageDiff
class BlockImageDiff(object):
def __init__(self, tgt, src=None, threads=None, version=4,
disable_imgdiff=False):
if threads is None:
threads = multiprocessing.cpu_count() // 2
if threads == 0:
threads = 1
self.threads = threads
self.version = version
self.transfers = []
self.src_basenames = {}
self.src_numpatterns = {}
self._max_stashed_size = 0
self.touched_src_ranges = RangeSet()
self.touched_src_sha1 = None
self.disable_imgdiff = disable_imgdiff
assert version in (1, 2, 3, 4)
self.tgt = tgt
if src is None:
src = EmptyImage()
self.src = src
# The updater code that installs the patch always uses 4k blocks.
assert tgt.blocksize == 4096
assert src.blocksize == 4096
# The range sets in each filemap should comprise a partition of
# the care map.
self.AssertPartition(src.care_map, src.file_map.values())
self.AssertPartition(tgt.care_map, tgt.file_map.values())
@property
def max_stashed_size(self):
return self._max_stashed_size
def Compute(self, prefix):
# When looking for a source file to use as the diff input for a
# target file, we try:
# 1) an exact path match if available, otherwise
# 2) a exact basename match if available, otherwise
# 3) a basename match after all runs of digits are replaced by
# "#" if available, otherwise
# 4) we have no source for this target.
self.AbbreviateSourceNames()
self.FindTransfers()
# Find the ordering dependencies among transfers (this is O(n^2)
# in the number of transfers).
self.GenerateDigraph()
# Find a sequence of transfers that satisfies as many ordering
# dependencies as possible (heuristically).
self.FindVertexSequence()
# Fix up the ordering dependencies that the sequence didn't
# satisfy.
if self.version == 1:
self.RemoveBackwardEdges()
else:
self.ReverseBackwardEdges()
self.ImproveVertexSequence()
# Ensure the runtime stash size is under the limit.
if self.version >= 2 and common.OPTIONS.cache_size is not None:
self.ReviseStashSize()
# Double-check our work.
self.AssertSequenceGood()
self.ComputePatches(prefix)
self.WriteTransfers(prefix)
def HashBlocks(self, source, ranges): # pylint: disable=no-self-use
data = source.ReadRangeSet(ranges)
ctx = sha1()
for p in data:
ctx.update(p)
return ctx.hexdigest()
def WriteTransfers(self, prefix):
def WriteSplitTransfers(out, style, target_blocks):
"""Limit the size of operand in command 'new' and 'zero' to 1024 blocks.
This prevents the target size of one command from being too large; and
might help to avoid fsync errors on some devices."""
assert (style == "new" or style == "zero")
blocks_limit = 1024
total = 0
while target_blocks:
blocks_to_write = target_blocks.first(blocks_limit)
out.append("%s %s\n" % (style, blocks_to_write.to_string_raw()))
total += blocks_to_write.size()
target_blocks = target_blocks.subtract(blocks_to_write)
return total
out = []
#.........这里部分代码省略.........
开发者ID:ED6E0F17,项目名称:platform_build,代码行数:101,代码来源:blockimgdiff.py
示例14: AddTransfer
def AddTransfer(tgt_name, src_name, tgt_ranges, src_ranges, style, by_id,
split=False):
"""Wrapper function for adding a Transfer()."""
# We specialize diff transfers only (which covers bsdiff/imgdiff/move);
# otherwise add the Transfer() as is.
if style != "diff" or not split:
Transfer(tgt_name, src_name, tgt_ranges, src_ranges, style, by_id)
return
# Handle .odex files specially to analyze the block-wise difference. If
# most of the blocks are identical with only few changes (e.g. header),
# we will patch the changed blocks only. This avoids stashing unchanged
# blocks while patching. We limit the analysis to files without size
# changes only. This is to avoid sacrificing the OTA generation cost too
# much.
if (tgt_name.split(".")[-1].lower() == 'odex' and
tgt_ranges.size() == src_ranges.size()):
# 0.5 threshold can be further tuned. The tradeoff is: if only very
# few blocks remain identical, we lose the opportunity to use imgdiff
# that may have better compression ratio than bsdiff.
crop_threshold = 0.5
tgt_skipped = RangeSet()
src_skipped = RangeSet()
tgt_size = tgt_ranges.size()
tgt_changed = 0
for src_block, tgt_block in zip(src_ranges.next_item(),
tgt_ranges.next_item()):
src_rs = RangeSet(str(src_block))
tgt_rs = RangeSet(str(tgt_block))
if self.src.ReadRangeSet(src_rs) == self.tgt.ReadRangeSet(tgt_rs):
tgt_skipped = tgt_skipped.union(tgt_rs)
src_skipped = src_skipped.union(src_rs)
else:
tgt_changed += tgt_rs.size()
# Terminate early if no clear sign of benefits.
if tgt_changed > tgt_size * crop_threshold:
break
if tgt_changed < tgt_size * crop_threshold:
assert tgt_changed + tgt_skipped.size() == tgt_size
print('%10d %10d (%6.2f%%) %s' % (tgt_skipped.size(), tgt_size,
tgt_skipped.size() * 100.0 / tgt_size, tgt_name))
AddSplitTransfers(
"%s-skipped" % (tgt_name,),
"%s-skipped" % (src_name,),
tgt_skipped, src_skipped, style, by_id)
# Intentionally change the file extension to avoid being imgdiff'd as
# the files are no longer in their original format.
tgt_name = "%s-cropped" % (tgt_name,)
src_name = "%s-cropped" % (src_name,)
tgt_ranges = tgt_ranges.subtract(tgt_skipped)
src_ranges = src_ranges.subtract(src_skipped)
# Possibly having no changed blocks.
if not tgt_ranges:
return
# Add the transfer(s).
AddSplitTransfers(
tgt_name, src_name, tgt_ranges, src_ranges, style, by_id)
开发者ID:ED6E0F17,项目名称:platform_build,代码行数:65,代码来源:blockimgdiff.py
示例15: DataImage
class DataImage(Image):
"""An image wrapped around a single string of data."""
def __init__(self, data, trim=False, pad=False):
self.data = data
self.blocksize = 4096
assert not (trim and pad)
partial = len(self.data) % self.blocksize
padded = False
if partial > 0:
if trim:
self.data = self.data[:-partial]
elif pad:
self.data += '\0' * (self.blocksize - partial)
padded = True
else:
raise ValueError(("data for DataImage must be multiple of %d bytes "
"unless trim or pad is specified") %
(self.blocksize,))
assert len(self.data) % self.blocksize == 0
self.total_blocks = len(self.data) / self.blocksize
self.care_map = RangeSet(data=(0, self.total_blocks))
# When the last block is padded, we always write the whole block even for
# incremental OTAs. Because otherwise the last block may get skipped if
# unchanged for an incremental, but would fail the post-install
# verification if it has non-zero contents in the padding bytes.
# Bug: 23828506
if padded:
clobbered_blocks = [self.total_blocks-1, self.total_blocks]
else:
clobbered_blocks = []
self.clobbered_blocks = clobbered_blocks
self.extended = RangeSet()
zero_blocks = []
nonzero_blocks = []
reference = '\0' * self.blocksize
for i in range(self.total_blocks-1 if padded else self.total_blocks):
d = self.data[i*self.blocksize : (i+1)*self.blocksize]
if d == reference:
zero_blocks.append(i)
zero_blocks.append(i+1)
else:
nonzero_blocks.append(i)
nonzero_blocks.append(i+1)
assert zero_blocks or nonzero_blocks or clobbered_blocks
self.file_map = dict()
if zero_blocks:
self.file_map["__ZERO"] = RangeSet(data=zero_blocks)
if nonzero_blocks:
self.file_map["__NONZERO"] = RangeSet(data=nonzero_blocks)
if clobbered_blocks:
self.file_map["__COPY"] = RangeSet(data=clobbered_blocks)
def _GetRangeData(self, ranges):
for s, e in ranges:
yield self.data[s*self.blocksize:e*self.blocksize]
def RangeSha1(self, ranges):
h = sha1()
for data in self._GetRangeData(ranges):
h.update(data)
return h.hexdigest()
def ReadRangeSet(self, ranges):
return [self._GetRangeData(ranges)]
def TotalSha1(self, include_clobbered_blocks=False):
if not include_clobbered_blocks:
return self.RangeSha1(self.care_map.subtract(self.clobbered_blocks))
else:
return sha1(self.data).hexdigest()
def WriteRangeDataToFd(self, ranges, fd):
for data in self._GetRangeData(ranges):
fd.write(data)
开发者ID:vicamo,项目名称:aosp_platform_build,代码行数:83,代码来源:blockimgdiff.py
标签:src,blocks,tgt,示例,Python,RangeSet,self,ranges From: https://www.cnblogs.com/codeking100/p/17362401.html