首页 > 其他分享 >mosaic.py

mosaic.py

时间:2023-06-24 18:47:53浏览次数:38  
标签:self py system sys state def ._ mosaic

#!/usr/bin/env python3

# Mosaic Emulator and Checker

import argparse
import ast
import copy
import inspect
import json
import random
from dataclasses import dataclass
from itertools import compress, product
from pathlib import Path
from typing import Callable, Generator

## 1. Mosaic system calls

### 1.1 Process, thread, and context switching

sys_fork = lambda: os.sys_fork()
sys_spawn = lambda fn, *args: os.sys_spawn(fn, *args)
sys_sched = lambda: os.sys_sched()

### 1.2 Virtual character device

sys_choose = lambda choices: os.sys_choose(choices)
sys_write = lambda *args: os.sys_write(*args)

### 1.3 Virtual block storage device

sys_bread = lambda k: os.sys_bread(k)
sys_bwrite = lambda k, v: os.sys_bwrite(k, v)
sys_sync = lambda: os.sys_sync()
sys_crash = lambda: os.sys_crash()

### 1.4 System call helpers

SYSCALLS = []

def syscall(func):  # @syscall decorator
    SYSCALLS.append(func.__name__)
    return func

## 2. Mosaic operating system emulator

### 2.1 Data structures

class Heap:
    pass  # no member: self.__dict__ is the heap

@dataclass
class Thread:
    context: Generator  # program counter, local variables, etc.
    heap: Heap  # a pointer to thread's "memory"

@dataclass
class Storage:
    persist: dict  # persisted storage state
    buf: dict  # outstanding operations yet to be persisted

### 2.2 The OperatingSystem class

class OperatingSystem:
    """An executable operating system model.

    The operating system model hosts a single Python application with a
    main() function accessible to a shared heap and 9 system calls
    (marked by the @syscall decorator). An example:

        def main():
            pid = sys_fork()
            sys_sched()  # non-deterministic context switch
            if pid == 0:
                sys_write('World')
            else:
                sys_write('Hello')

    At any moment, this model keeps tracking a set of threads and a
    "currently running" one. Each thread consists of a reference to a
    heap object (may be shared with other threads) and a private context
    (program counters, local variables, etc.). A thread context is a
    Python generator object, i.e., a stack-less coroutine [1] that can
    save the running context and yield itself.

    For applications, the keyword "yield" is reserved for system calls.
    For example, a "choose" system call [2]:

        sys_choose(['A', 'B'])

    is transpiled as yielding the string "sys_choose" followed by its
    parameters (choices):

        res = yield 'sys_choose', ['A', 'B'].

    Yield will transfer the control to the OS for system call handling
    and eventually returning a value ('A' or 'B') to the application.

    Right after transferring control to the OS by "yield", the function
    state is "frozen", where program counters and local variables are
    accessible via the generator object. Therefore, OS can serialize its
    internal state--all thread's contexts, heaps, and virtual device
    states at this moment.

    In this sense, operating system is a system-call driven state
    transition system:

        (s0) --run first thread (main)-> (s1)
             --sys_choose and application execution-> (s2)
             --sys_sched and application execution-> (s3) ...

    Real operating systems can be preemptive--context switching can
    happen non-deterministically at any program point, simply because
    processor can non-deterministically interrupt its currently running
    code and transfer the control to the operating system.

    The OS internal implementation does NOT immediately process the
    system call: it returns all possible choices available at the moment
    and their corresponding processing logic as callbacks. For the
    example above, the "choose" system call returns a non-deterministic
    choice among given choices. The internal implementation thus returns

        choices = {
            'A': (lambda: 'A'),
            'B': (lambda: 'B'),
        }

    for later processing. Another example is non-deterministic context
    switching by yielding 'sys_sched'. Suppose there are threads t1 and
    t2 at the moment. The system call handler will return

        choices = {
            't1': (lambda: switch_to(t1)),
            't2': (lambda: switch_to(t2)),
        }

    in which switch_to(th) replaces the OS's current running thread with
    th (changes the global "heap" variable). Such deferred execution of
    system calls separates the mechanism of non-deterministic choices
    from the actual decision makers (e.g., an interpreter or a model
    checker). Once the decision is made, the simply call step(choice)
    and the OS will execute this choice by

        choices[choice]()

    with the application code (generator) being resumed.

    This model provides "write" system call to immediately push data to
    a hypothetical character device like a tty associated with stdout.
    We model a block device (key-value store) that may lose data upon
    crash. The model assumes atomicity of each single block write (a
    key-value update). However, writes are merely to a volatile buffer
    which may non-deterministically lose data upon crash 3]. The "sync"
    system call persists buffered writes.

    References:

    [1] N. Schemenauer, T. Peters, and M. L. Hetland. PEP 255 -
        Simple generators. https://peps.python.org/pep-0255/
    [2] J. Yang, C. Sar, and D. Engler. eXplode: a lightweight, general
        system for finding serious storage system errors. OSDI'06.
    [3] T. S. Pillai, V. Chidambaram, R. Alagappan, A. Al-Kiswany, A. C.
        Arpaci-Dusseau, and R. H. Arpaci-Dusseau. All file systems are
        not created equal: On the complexity of crafting crash
        consistent applications. OSDI'14.
    """

    def __init__(self, init: Callable):
        """Create a new OS instance with pending-to-execute init thread."""
        # Operating system states
        self._threads = [Thread(context=init(), heap=Heap())]
        self._current = 0
        self._choices = {init.__name__: lambda: None}
        self._stdout = ''
        self._storage = Storage(persist={}, buf={})

        # Internal states
        self._init = init
        self._trace = []
        self._newfork = set()

### 2.3 System call implementation

#### 2.3.1 Process, thread, and context switching

    @syscall
    def sys_spawn(self, func: Callable, *args):
        """Spawn a heap-sharing new thread executing func(args)."""
        def do_spawn():
            self._threads.append(
                Thread(
                    context=func(*args),  # func() returns a new generator
                    heap=self.current().heap,  # shared heap
                )
            )
        return {'spawn': (lambda: do_spawn())}

    @syscall
    def sys_fork(self):
        """Create a clone of the current thread with a copied heap."""
        if all(not f.frame.f_locals['fork_child']
                for f in inspect.stack()
                    if f.function == '_step'):  # this is parent; do fork
            # Deep-copying generators causes troubles--they are twined with
            # Python's runtime state. We use an (inefficient) hack here: replay 
            # the entire trace and override the last fork() to avoid infinite
            # recursion.
            os_clone = OperatingSystem(self._init)
            os_clone.replay(self._trace[:-1])
            os_clone._step(self._trace[-1], fork_child=True)

            # Now os_clone._current is the forked process. Cloned thread just 
            # yields a sys_fork and is pending for fork()'s return value. It
            # is necessary to mark cloned threads (in self._newfork) and send
            # child's fork() return value when they are scheduled for the
            # first time.
            def do_fork():
                self._threads.append(os_clone.current())
                self._newfork.add((pid := len(self._threads)) - 1)
                return 1000 + pid  # returned pid starts from 1000

            return {'fork': (lambda: do_fork())}
        else:
            return None  # overridden fork; this value is never used because
                         # os_clone is destroyed immediately after fork()

    @syscall
    def sys_sched(self):
        """Return a non-deterministic context switch to a runnable thread."""
        return {
            f't{i+1}': (lambda i=i: self._switch_to(i))
                for i, th in enumerate(self._threads)
                    if th.context.gi_frame is not None  # thread still alive?
        }

### 2.3.2 Virtual character device (byte stream)

    @syscall
    def sys_choose(self, choices):
        """Return a non-deterministic value from choices."""
        return {f'choose {c}': (lambda c=c: c) for c in choices}

    @syscall
    def sys_write(self, *args):
        """Write strings (space separated) to stdout."""
        def do_write():
            self._stdout += ' '.join(str(arg) for arg in args)
        return {'write': (lambda: do_write())}

### 2.3.3 Virtual block storage device

    @syscall
    def sys_bread(self, key):
        """Return the specific key's associated value in block device."""
        storage = self._storage
        return {'bread': (lambda:
            storage.buf.get(key,  # always try to read from buffer first
                storage.persist.get(key, None)  # and then persistent storage
            )
        )}

    @syscall
    def sys_bwrite(self, key, value):
        """Write (key, value) pair to block device's buffer."""
        def do_bwrite():
            self._storage.buf[key] = value
        return {'bwrite': (lambda: do_bwrite())}

    @syscall
    def sys_sync(self):
        """Persist all buffered writes."""
        def do_sync():
            store = self._storage
            self._storage = Storage(
                persist=store.persist | store.buf,  # write back
                buf={}
            )
        return {'sync': (lambda: do_sync())}

    @syscall
    def sys_crash(self):
        """Simulate a system crash that non-deterministically persists
        outstanding writes in the buffer.
        """
        persist = self._storage.persist
        btrace = self._storage.buf.items()  # block trace

        crash_sites = (
            lambda subset=subset:
                setattr(self, '_storage',
                    Storage(  # persist only writes in the subset
                        persist=persist | dict(compress(btrace, subset)),
                        buf={}
                    )
                ) for subset in  # Mosaic allows persisting any subset of
                    product(     # pending blocks in the buffer
                        *([(0, 1)] * len(btrace))
                    )
        )
        return dict(enumerate(crash_sites))

### 2.4 Operating system as a state machine

    def replay(self, trace: list) -> dict:
        """Replay an execution trace and return the resulting state."""
        for choice in trace:
            self._step(choice)
        return self.state_dump()

    def _step(self, choice, fork_child=False):
        self._switch_to(self._current)
        self._trace.append(choice)  # keep all choices for replay-based fork()
        action = self._choices[choice]  # return value of sys_xxx: a lambda
        res = action()

        try:  # Execute current thread for one step
            func, args = self.current().context.send(res)
            assert func in SYSCALLS
            self._choices = getattr(self, func)(*args)
        except StopIteration:  # ... and thread terminates
            self._choices = self.sys_sched()

        # At this point, the operating system's state is
        #   (self._threads, self._current, self._stdout, self._storage)
        # and outgoing transitions are saved in self._choices.

### 2.5 Misc and helper functions

    def state_dump(self) -> dict:
        """Create a serializable Mosaic state dump with hash code."""
        heaps = {}
        for th in self._threads:
            if (i := id(th.heap)) not in heaps:  # unique heaps
                heaps[i] = len(heaps) + 1

        os_state = {
            'current': self._current,
            'choices': sorted(list(self._choices.keys())),
            'contexts': [
                {
                    'name': th.context.gi_frame.f_code.co_name,
                    'heap': heaps[id(th.heap)],  # the unique heap id
                    'pc': th.context.gi_frame.f_lineno,
                    'locals': th.context.gi_frame.f_locals,
                } if th.context.gi_frame is not None else None
                    for th in self._threads
            ],
            'heaps': {
                heaps[id(th.heap)]: th.heap.__dict__
                    for th in self._threads
            },
            'stdout': self._stdout,
            'store_persist': self._storage.persist,
            'store_buffer': self._storage.buf,
        }

        h = hash(json.dumps(os_state, sort_keys=True)) + 2**63
        return (copy.deepcopy(os_state)  # freeze the runtime state
                | dict(hashcode=f'{h:016x}'))

    def current(self) -> Thread:
        """Return the current running thread object."""
        return self._threads[self._current]

    def _switch_to(self, tid: int):
        self._current = tid
        globals()['os'] = self
        globals()['heap'] = self.current().heap
        if tid in self._newfork:
            self._newfork.remove(tid)  # tricky: forked process must receive 0
            return 0                   # to indicate a child

## 3. The Mosaic runtime

class Mosaic:
    """The operating system interpreter and model checker.

    The operating system model is a state transition system: os.replay()
    maps any trace to a state (with its outgoing transitions). Based
    on this model, two state space explorers are implemented:

    - run:   Choose outgoing transitions uniformly at random, yielding a
             single execution trace.
    - check: Exhaustively explore all reachable states by a breadth-
             first search. Duplicated states are not visited twice.

    Both explorers produce the visited portion of the state space as a
    serializable object containing:

    - source:   The application source code
    - vertices: A list of operating system state dumps. The first vertex
                in the list is the initial state. Each vertex has a
                unique "hashcode" id.
    - edges:    A list of 3-tuples: (source, target, label) denoting an 
                explored source --[label]-> target edge. Both source and
                target are state hashcode ids.
    """

### 3.1 Model interpreter and checker

    def run(self) -> dict:
        """Interpret the model with non-deterministic choices."""
        os = OperatingSystem(self.entry)
        V, E = [os.state_dump() | dict(depth=0)], []

        while (choices := V[-1]['choices']):
            choice = random.choice(choices)  # uniformly at random
            V.append(os.replay([choice]) | dict(depth=len(V)))
            E.append((V[-2]['hashcode'], V[-1]['hashcode'], choice))

        return dict(source=self.src, vertices=V, edges=E)

    def check(self) -> dict:
        """Exhaustively explore the state space."""
        class State:
            entry = self.entry

            def __init__(self, trace):
                self.trace = trace
                self.state = OperatingSystem(State.entry).replay(trace)
                self.state |= dict(depth=0)
                self.hashcode = self.state['hashcode']

            def extend(self, c):
                st = State(self.trace + (c,))
                st.state = st.state | dict(depth=self.state['depth'] + 1)
                return st

        st0 = State(tuple())  # initial state of empty trace
        queued, V, E = [st0], {st0.hashcode: st0.state}, []

        while queued:
            st = queued.pop(0)
            for choice in st.state['choices']:
                st1 = st.extend(choice)
                if st1.hashcode not in V:  # found an unexplored state
                    V[st1.hashcode] = st1.state
                    queued.append(st1)
                E.append((st.hashcode, st1.hashcode, choice))

        return dict(
            source=self.src,
            vertices=sorted(V.values(), key=lambda st: st['depth']),
            edges=E
        )

### 3.1 Source code parsing and rewriting

    class Transformer(ast.NodeTransformer):
        def visit_Call(self, node):
            # Rewrite system calls as yields
            if (isinstance(node.func, ast.Name) and
                    node.func.id in SYSCALLS):  # rewrite system calls
                return ast.Yield(ast.Tuple(     #   -> yield ('sys_xxx', args)
                    elts=[
                        ast.Constant(value=node.func.id),
                        ast.Tuple(elts=node.args),
                    ]
                ))
            else:
                return node

    def __init__(self, src: str):
        tree = ast.parse(src)
        hacked_ast = self.Transformer().visit(tree)
        hacked_src = ast.unparse(hacked_ast)

        context = {}
        exec(hacked_src, globals(), context)
        globals().update(context)

        self.src = src
        self.entry = context['main']  # must have a main()

## 4. Utilities

if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description='The modeled operating system and state explorer.'
    )
    parser.add_argument(
        'source',
        help='application code (.py) to be checked; must have a main()'
    )
    parser.add_argument('-r', '--run', action='store_true')
    parser.add_argument('-c', '--check', action='store_true')
    args = parser.parse_args()

    src = Path(args.source).read_text()
    mosaic = Mosaic(src)
    if args.check:
        explored = mosaic.check()
    else:
        explored = mosaic.run()  # run is the default option

    # Serialize the explored states and write to stdout. This encourages piping
    # the results to another tool following the UNIX philosophy. Examples:
    #
    #   mosaic --run foo.py | grep stdout | tail -n 1  # quick and dirty check
    #   mosaic --check bar.py | fx  # or any other interactive visualizer
    #
    print(json.dumps(explored, ensure_ascii=False, indent=2))

标签:self,py,system,sys,state,def,._,mosaic
From: https://www.cnblogs.com/c-x-a/p/17501474.html

相关文章

  • Python random模块
    Pythonrandom模块random模块用于生成随机数importrandomprint(random.random())print(random.randint(1,100))print(random.randrange(1,100))输出结果:0.182467957909153044666randint和randrange的区别##########randint##########defrandint(self,......
  • python实现文件批量压缩
    python实现文件夹下文件的批量压缩初级版本压缩之前压缩之后:源代码使用的时候只需要把“directory_path=r'D:\桌面\ceshi'”这里的路径修改成对应的文件夹即可importosimportzipfiledefzip_files_in_directory(directory):#获取目录中的文件列表fi......
  • Python configparser模块
    Pythonconfigparser模块用于生成和修改常见配置文档,当前模块的名称在python3.x版本中变更为configparser。常见文档格式如下[DEFAULT]ServerAliveInterval=45Compression=yesCompressionLevel=9ForwardX11=yes[bitbucket.org]User=hg[topsecret.se......
  • 0.组织Python代码的技巧
    组织Python代码技巧英文版链接一.构建Python项目首先关注目录结构、文件命名和模块组织。我建议把所有的模块都放在src目录下,所有测试都和它并排。python项目的组织结构---建议把所有的模块都放在src目录下,所有测试都和它并排。如下图:<project>├──src│├......
  • Python json和pickle模块
    json和pickle模块用于序列化的两个模块序列化模块:json和picklejson,用于字符串和python数据类型间进行转换pickle,用于python特有的类型和python的数据类型间进行转换Json模块提供了四个功能:dumps、dump、loads、loadpickle模块提供了四个功能:dumps、dump、loads、......
  • Python潮流周刊#8:Python 3.13 计划将解释器提速 50%!
    你好,我是猫哥。这里每周分享优质的Python及通用技术内容,部分为英文,已在小标题注明。(标题取自其中一则分享,不代表全部内容都是该主题,特此声明。)首发于我的博客:https://pythoncat.top/posts/2023-06-24-weekly......
  • Python subprocess模块
    Pythonsubprocess模块subprocess模块可以执行shell命令的相关模块和函数有:os.systemos.spawn*os.popen*--废弃popen2.*--废弃commands.*--废弃,3.x中被移除以上执行shell命令的相关的模块和函数的功能均在subprocess模块中实现,并提供......
  • Python shutil模块
    Pythonshutil模块高级的文件、文件夹、压缩包处理模块将文件内容拷贝到另一个文件中,可以部分内容shutil.copyfileobj(fsrc,fdst[,length])例子importshutilf1=open("a.txt",encoding="utf-8")f2=open("b.txt","w",encoding="utf-8")shu......
  • 【python基础】文件-初识文件
    文本文件可存储的数据量是非常多的。每当需要分析或修改存储在文件中的信息时,首先就是读取文件到内存中,为此可以一次性读取文件的全部内容,也可以以每次一行的方式逐步读取。1.读取文件1.1读取整个文件要读取文件,需要一个包含几行文本的文件。下面首先来创建一个poems文本文件,,里......
  • Python os模块
    Pythonos模块os模块用于提供系统级别的操作os.getcwd()#获取当前工作目录,即当前python脚本工作的目录路径os.chdir("dirname")#改变当前脚本工作目录;相当于shell下cdos.curdir#返回当前目录:('.')os.pardir#获取当前目录......