首页 > 其他分享 >realsense d435i获取imu数据

realsense d435i获取imu数据

时间:2023-05-07 09:04:13浏览次数:39  
标签:d435i gyro self realsense print imu np data

 

 

#!/usr/bin/python
from __future__ import print_function
import numpy as np
import sys
import json
import ctypes
import os
import binascii
import struct
import pyrealsense2 as rs
import ctypes
import time
import enum
import threading

# L515
READ_TABLE  = 0x43     # READ_TABLE 0x243 0
WRITE_TABLE = 0x44     # WRITE_TABLE 0 <table>

# L515 minimum firmware version required to support IMU calibration
L515_FW_VER_REQUIRED = '01.04.01.00'

is_data = None
get_key = None
if os.name == 'posix':
    import select
    import tty
    import termios

    is_data = lambda : select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], [])
    get_key = lambda : sys.stdin.read(1)

elif os.name == 'nt':
    import msvcrt
    is_data = msvcrt.kbhit
    get_key = lambda : msvcrt.getch()

else:
    raise Exception('Unsupported OS: %s' % os.name)

if sys.version_info[0] < 3:
    input = raw_input

max_float = struct.unpack('f',b'\xff\xff\xff\xff')[0]
max_int = struct.unpack('i',b'\xff\xff\xff\xff')[0]
max_uint8 = struct.unpack('B', b'\xff')[0]

g = 9.80665 # SI Gravity page 52 of https://nvlpubs.nist.gov/nistpubs/Legacy/SP/nistspecialpublication330e2008.pdf

COLOR_RED   = "\033[1;31m"  
COLOR_BLUE  = "\033[1;34m"
COLOR_CYAN  = "\033[1;36m"
COLOR_GREEN = "\033[0;32m"
COLOR_RESET = "\033[0;0m"
COLOR_BOLD    = "\033[;1m"
COLOR_REVERSE = "\033[;7m"

def int_to_bytes(num, length=4, order='big'):
    res = bytearray(length)
    for i in range(length):
        res[i] = num & 0xff
        num >>= 8
    if num:
        raise OverflowError("Number {} doesn't fit into {} bytes.".format(num, length))
    if order == 'little':
        res.reverse()
    return res


def bytes_to_uint(bytes_array, order='little'):
    bytes_array = list(bytes_array)
    bytes_array.reverse()
    if order == 'little':
        return struct.unpack('>i', struct.pack('BBBB', *([0] * (4 - len(bytes_array))) + bytes_array))[0] & 0xffffffff
    else:
        return struct.unpack('>i', struct.pack('BBBB', *([0] * (4 - len(bytes_array))) + bytes_array))[0] & 0xffffffff


class imu_wrapper:
    class Status(enum.Enum):
        idle = 0,
        rotate = 1,
        wait_to_stable = 2,
        collect_data = 3

    def __init__(self):
        self.pipeline = None
        self.imu_sensor = None
        self.status = self.Status(self.Status.idle)     # 0 - idle, 1 - rotate to position, 2 - wait to stable, 3 - pick data
        self.thread = threading.Condition()
        self.step_start_time = time.time()
        self.time_to_stable = 3
        self.time_to_collect = 2
        self.samples_to_collect = 1000
        self.rotating_threshold = 0.1
        self.moving_threshold_factor = 0.1
        self.collected_data_gyro = []
        self.collected_data_accel = []
        self.callback_lock = threading.Lock()
        self.max_norm = np.linalg.norm(np.array([0.5, 0.5, 0.5]))
        self.line_length = 20
        self.is_done = False
        self.is_data = False

    def escape_handler(self):
        self.thread.acquire()
        self.status = self.Status.idle
        self.is_done = True
        self.thread.notify()
        self.thread.release()
        sys.exit(-1)

    def imu_callback(self, frame):
        if not self.is_data:
            self.is_data = True

        with self.callback_lock:
            try:
                if is_data():
                    c = get_key()
                    if c == '\x1b':         # x1b is ESC
                        self.escape_handler()

                if self.status == self.Status.idle:
                    return
                pr = frame.get_profile()
                data = frame.as_motion_frame().get_motion_data()
                data_np = np.array([data.x, data.y, data.z])
                elapsed_time = time.time() - self.step_start_time

                ## Status.collect_data
                if self.status == self.Status.collect_data:
                    sys.stdout.write('\r %15s' % self.status)
                    part_done = len(self.collected_data_accel) / float(self.samples_to_collect)
                    # sys.stdout.write(': %-3.1f (secs)' % (self.time_to_collect - elapsed_time))

                    color = COLOR_GREEN
                    if pr.stream_type() == rs.stream.gyro:
                        self.collected_data_gyro.append(np.append(frame.get_timestamp(), data_np))
                        is_moving = any(abs(data_np) > self.rotating_threshold)
                    else:
                        is_in_norm = np.linalg.norm(data_np - self.crnt_bucket) < self.max_norm
                        if is_in_norm:
                            self.collected_data_accel.append(np.append(frame.get_timestamp(), data_np))
                        else:
                            color = COLOR_RED
                        is_moving = abs(np.linalg.norm(data_np) - g) / g > self.moving_threshold_factor

                        sys.stdout.write(color)
                        sys.stdout.write('['+'.'*int(part_done*self.line_length)+' '*int((1-part_done)*self.line_length) + ']')
                        sys.stdout.write(COLOR_RESET)

                    if is_moving:
                        print('WARNING: MOVING')
                        self.status = self.Status.rotate
                        return

                    # if elapsed_time > self.time_to_collect:
                    if part_done >= 1:
                        self.status = self.Status.collect_data
                        sys.stdout.write('\n\nDirection data collected.')
                        self.thread.acquire()
                        self.status = self.Status.idle
                        self.thread.notify()
                        self.thread.release()
                        return

                if pr.stream_type() == rs.stream.gyro:
                    return
                sys.stdout.write('\r %15s' % self.status)
                crnt_dir = np.array(data_np) / np.linalg.norm(data_np)
                crnt_diff = self.crnt_direction - crnt_dir
                is_in_norm = np.linalg.norm(data_np - self.crnt_bucket) < self.max_norm               

                ## Status.rotate
                if self.status == self.Status.rotate:
                    sys.stdout.write(': %35s' % (np.array2string(crnt_diff,  precision=4, suppress_small=True)))
                    sys.stdout.write(': %35s' % (np.array2string(abs(crnt_diff) < 0.1)))
                    if is_in_norm:
                        self.status = self.Status.wait_to_stable
                        sys.stdout.write('\r'+' '*90)
                        self.step_start_time = time.time()
                        return

                ## Status.wait_to_stable
                if self.status == self.Status.wait_to_stable:
                    sys.stdout.write(': %-3.1f (secs)' % (self.time_to_stable - elapsed_time))
                    if not is_in_norm:
                        self.status = self.Status.rotate
                        return
                    if elapsed_time > self.time_to_stable:
                        self.collected_data_gyro = []
                        self.collected_data_accel = []
                        self.status = self.Status.collect_data
                        self.step_start_time = time.time()
                        return
                return
            except Exception as e:
                print('ERROR?' + str(e))
                self.thread.acquire()
                self.status = self.Status.idle
                self.thread.notify()
                self.thread.release()

    def get_measurements(self, buckets, bucket_labels):
        measurements = []
        print('-------------------------')
        print('*** Press ESC to Quit ***')
        print('-------------------------')
        for bucket,bucket_label in zip(buckets, bucket_labels):
            self.crnt_bucket = np.array(bucket)
            self.crnt_direction = np.array(bucket) / np.linalg.norm(np.array(bucket))
            print('\nAlign to direction: ', self.crnt_direction, ' ', bucket_label)
            self.status = self.Status.rotate
            self.thread.acquire()
            while (not self.is_done and self.status != self.Status.idle):
                self.thread.wait(3)
                if not self.is_data:
                    raise Exception('No IMU data. Check connectivity.')
            if self.is_done:
                raise Exception('User Abort.')
            measurements.append(np.array(self.collected_data_accel))
        return np.array(measurements), np.array(self.collected_data_gyro)

    def enable_imu_device(self, serial_no):
        self.pipeline = rs.pipeline()
        cfg = rs.config()
        cfg.enable_device(serial_no)
        try:
            self.pipeline.start(cfg)
        except Exception as e:
            print('ERROR: ', str(e))
            return False

        # self.sync_imu_by_this_stream = rs.stream.any
        active_imu_profiles = []

        active_profiles = dict()
        self.imu_sensor = None
        for sensor in self.pipeline.get_active_profile().get_device().sensors:
            for pr in sensor.get_stream_profiles():
                if pr.stream_type() == rs.stream.gyro and pr.format() == rs.format.motion_xyz32f:
                    active_profiles[pr.stream_type()] = pr
                    self.imu_sensor = sensor
                if pr.stream_type() == rs.stream.accel and pr.format() == rs.format.motion_xyz32f:
                    active_profiles[pr.stream_type()] = pr
                    self.imu_sensor = sensor
            if self.imu_sensor:
                break
        if not self.imu_sensor:
            print('No IMU sensor found.')
            return False
        print('\n'.join(['FOUND %s with fps=%s' % (str(ap[0]).split('.')[1].upper(), ap[1].fps()) for ap in active_profiles.items()]))
        active_imu_profiles = list(active_profiles.values())
        if len(active_imu_profiles) < 2:
            print('Not all IMU streams found.')
            return False
        self.imu_sensor.stop()
        self.imu_sensor.close()
        self.imu_sensor.open(active_imu_profiles)
        self.imu_start_loop_time = time.time()
        self.imu_sensor.start(self.imu_callback)

        # Make the device use the original IMU values and not already calibrated:
        if self.imu_sensor.supports(rs.option.enable_motion_correction):
            self.imu_sensor.set_option(rs.option.enable_motion_correction, 0)
        return True

class CHeader:
    def __init__(self, version, table_type):
        self.buffer = np.ones(16, dtype=np.uint8) * 255
        self.buffer[0] = int(version[0], 16)
        self.buffer[1] = int(version[1], 16)
        self.buffer.dtype=np.uint16
        self.buffer[1] = int(table_type, 16)

    def size(self):
        return 16

    def set_data_size(self, size):
        self.buffer.dtype=np.uint32
        self.buffer[1] = size

    def set_crc32(self, crc32):
        self.buffer.dtype=np.uint32
        self.buffer[3] = crc32 % (1<<32)    # convert from signed to unsigned 32 bit
    
    def get_buffer(self):
        self.buffer.dtype=np.uint8
        return self.buffer


def bitwise_int_to_float(ival):
    return struct.unpack('f', struct.pack('i', ival))[0]
    
def bitwise_float_to_int(fval):
    return struct.unpack('i', struct.pack('f', fval))[0]

def parse_buffer(buffer):
    cmd_size = 24
    header_size = 16

    buffer.dtype=np.uint32    
    tab1_size = buffer[3]
    buffer.dtype=np.uint8
    print('tab1_size (all_data): ', tab1_size)

    tab1 = buffer[cmd_size:cmd_size+tab1_size]  # 520 == epprom++
    tab1.dtype=np.uint32
    tab2_size = tab1[1]
    tab1.dtype=np.uint8
    print('tab2_size (calibration_table): ', tab2_size)

    tab2 = tab1[header_size:header_size+tab2_size] # calibration table
    tab2.dtype=np.uint32
    tab3_size = tab2[1]
    tab2.dtype=np.uint8
    print('tab3_size (calibration_table): ', tab3_size)

    tab3 = tab2[header_size:header_size+tab3_size]  # D435 IMU Calib Table
    tab3.dtype=np.uint32
    tab4_size = tab3[1]
    tab3.dtype=np.uint8
    print('tab4_size (D435_IMU_Calib_Table): ', tab4_size)

    tab4 = tab3[header_size:header_size+tab4_size]  # calibration data
    return tab1, tab2, tab3, tab4

def get_IMU_Calib_Table(X, product_line):
    version = ['0x02', '0x01']
    table_type = '0x20'

    if product_line == 'L500':
        version = ['0x05', '0x01']
        table_type = '0x243'

    header = CHeader(version, table_type)
    
    header_size = header.size()
    data_size = 37*4 + 96
    size_of_buffer = header_size + data_size    # according to table "D435 IMU Calib Table" here: https://user-images.githubusercontent.com/6958867/50902974-20507500-1425-11e9-8ca5-8bd2ac2d0ea1.png
    assert(size_of_buffer % 4 == 0)
    buffer = np.ones(size_of_buffer, dtype=np.uint8) * 255

    use_extrinsics = False
    use_intrinsics = True

    data_buffer = np.ones(data_size, dtype=np.uint8) * 255
    data_buffer.dtype = np.float32

    data_buffer[0] = bitwise_int_to_float(np.int32(int(use_intrinsics)) << 8 | 
                                          np.int32(int(use_extrinsics)))

    intrinsic_vector = np.zeros(24, dtype=np.float32)
    intrinsic_vector[:9] = X[:3,:3].T.flatten()
    intrinsic_vector[9:12] = X[:3,3]
    intrinsic_vector[12:21] = X[3:,:3].flatten()
    intrinsic_vector[21:24] = X[3:,3]    

    data_buffer[13:13+X.size] = intrinsic_vector
    data_buffer.dtype = np.uint8

    header.set_data_size(data_size)

    header.set_crc32(binascii.crc32(data_buffer))
    buffer[:header_size] = header.get_buffer()
    buffer[header_size:] = data_buffer
    return buffer


def get_calibration_table(d435_imu_calib_table):
    version = ['0x02', '0x00']
    table_type = '0x20'

    header = CHeader(version, table_type)

    d435_imu_calib_table_size = d435_imu_calib_table.size
    sn_table_size = 32
    data_size = d435_imu_calib_table_size + sn_table_size

    header_size = header.size()
    size_of_buffer = header_size + data_size    # according to table "D435 IMU Calib Table" in "https://sharepoint.ger.ith.intel.com/sites/3D_project/Shared%20Documents/Arch/D400/FW/D435i_IMU_Calibration_eeprom_0_52.xlsx"
    assert(size_of_buffer % 4 == 0)
    buffer = np.ones(size_of_buffer, dtype=np.uint8) * 255

    data_buffer = np.ones(data_size, dtype=np.uint8) * 255
    data_buffer[:d435_imu_calib_table_size] = d435_imu_calib_table

    header.set_data_size(data_size)
    header.set_crc32(binascii.crc32(data_buffer))

    buffer[:header_size] = header.get_buffer()
    buffer[header_size:header_size+data_size] = data_buffer
    return buffer

def get_eeprom(calibration_table):
    version = ['0x01', '0x01']
    table_type = '0x09'

    header = CHeader(version, table_type)

    DC_MM_EEPROM_SIZE = 520
    # data_size = calibration_table.size

    header_size = header.size()
    size_of_buffer = DC_MM_EEPROM_SIZE
    data_size = size_of_buffer - header_size 
    # size_of_buffer = header_size + data_size
    
    assert(size_of_buffer % 4 == 0)
    buffer = np.ones(size_of_buffer, dtype=np.uint8) * 255

    header.set_data_size(data_size)
    buffer[header_size:header_size+calibration_table.size] = calibration_table
    header.set_crc32(binascii.crc32(buffer[header_size:]))

    buffer[:header_size] = header.get_buffer()

    return buffer

def write_eeprom_to_camera(eeprom, serial_no=''):
    # DC_MM_EEPROM_SIZE = 520
    DC_MM_EEPROM_SIZE = eeprom.size
    DS5_CMD_LENGTH = 24

    MMEW_Cmd_bytes = b'\x14\x00\xab\xcd\x50\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'


    buffer = np.ones([DC_MM_EEPROM_SIZE + DS5_CMD_LENGTH, ], dtype = np.uint8) * 255
    cmd = np.array(struct.unpack('I'*6, MMEW_Cmd_bytes), dtype=np.uint32)
    cmd.dtype = np.uint16
    cmd[0] += DC_MM_EEPROM_SIZE
    cmd.dtype = np.uint32
    cmd[3] = DC_MM_EEPROM_SIZE  # command 1 = 0x50
                                # command 2 = 0
                                # command 3 = size
    cmd.dtype = np.uint8
    buffer[:len(cmd)] = cmd
    buffer[len(cmd):len(cmd)+eeprom.size] = eeprom

    debug = get_debug_device(serial_no)
    if not debug:
        print('Error getting RealSense Device.')
        return
    # tab1, tab2, tab3, tab4 = parse_buffer(buffer)

    rcvBuf = debug.send_and_receive_raw_data(bytearray(buffer))
    if rcvBuf[0] == buffer[4]:
        print('SUCCESS: saved calibration to camera.')
    else:
        print('FAILED: failed to save calibration to camera.')
        print(rcvBuf)


def get_debug_device(serial_no):
    ctx = rs.context()
    devices = ctx.query_devices()
    found_dev = False
    for dev in devices:
        if len(serial_no) == 0 or serial_no == dev.get_info(rs.camera_info.serial_number):
            found_dev = True
            break
    if not found_dev:
        print('No RealSense device found' + str('.' if len(serial_no) == 0 else ' with serial number: '+serial_no))
        return 0

    # print(a few basic information about the device)
    print('  Device PID: ',  dev.get_info(rs.camera_info.product_id))
    print('  Device name: ',  dev.get_info(rs.camera_info.name))
    print('  Serial number: ',  dev.get_info(rs.camera_info.serial_number))
    print('  Firmware version: ',  dev.get_info(rs.camera_info.firmware_version))
    debug = rs.debug_protocol(dev)
    return debug

def check_X(X, accel, show_graph):
    fdata = np.apply_along_axis(np.dot, 1, accel, X[:3,:3]) - X[3,:]
    norm_data = (accel**2).sum(axis=1)**(1./2)
    norm_fdata = (fdata**2).sum(axis=1)**(1./2)
    if show_graph:
        import pylab
        pylab.plot(norm_data, '.b')
        #pylab.hold(True)
        pylab.plot(norm_fdata, '.g')
        pylab.show()
    print ('norm (raw data  ): %f' % np.mean(norm_data))
    print ('norm (fixed data): %f' % np.mean(norm_fdata), "A good calibration will be near %f" % g)

def l500_send_command(dev, op_code, param1=0, param2=0, param3=0, param4=0, data=[], retries=1):

    for i in range(retries):
        try:
            debug_device = rs.debug_protocol(dev)
            gvd_command_length = 0x14 + len(data)
            magic_number1 = 0xab
            magic_number2 = 0xcd

            buf = bytearray()
            buf += bytes(int_to_bytes(gvd_command_length, 2))
            #buf += bytes(int_to_bytes(0, 1))
            buf += bytes(int_to_bytes(magic_number1, 1))
            buf += bytes(int_to_bytes(magic_number2, 1))
            buf += bytes(int_to_bytes(op_code))
            buf += bytes(int_to_bytes(param1))
            buf += bytes(int_to_bytes(param2))
            buf += bytes(int_to_bytes(param3))
            buf += bytes(int_to_bytes(param4))
            buf += bytearray(data)
            l = list(buf)
            res = debug_device.send_and_receive_raw_data(buf)

            if res[0] == op_code:
                res1 = res[4:]
                return res1
            else:
                raise Exception("send_command return error", res[0])
        except:
            if i < retries - 1:
                time.sleep(0.1)
            else:
                raise

def wait_for_rs_device(serial_no):
    ctx = rs.context()

    start = int(round(time.time() * 1000))
    now = int(round(time.time() * 1000))

    while now - start < 5000:
        devices = ctx.query_devices()
        for dev in devices:
            pid = str(dev.get_info(rs.camera_info.product_id))
            if len(serial_no) == 0 or serial_no == dev.get_info(rs.camera_info.serial_number):

                # print(a few basic information about the device)
                print('  Device PID: ',  dev.get_info(rs.camera_info.product_id))
                print('  Device name: ',  dev.get_info(rs.camera_info.name))
                print('  Serial number: ',  dev.get_info(rs.camera_info.serial_number))
                print('  Product Line: ',  dev.get_info(rs.camera_info.product_line))
                print('  Firmware version: ',  dev.get_info(rs.camera_info.firmware_version))

                return dev
        time.sleep(5)
        now = int(round(time.time() * 1000))
    raise Exception('No RealSense device' + str('.' if len(serial_no) == 0 else ' with serial number: '+serial_no))


def main():
    if any([help_str in sys.argv for help_str in ['-h', '--help', '/?']]):
        print("Usage:", sys.argv[0], "[Options]")
        print
        print('[Options]:')
        print('-i : /path/to/accel.txt [/path/to/gyro.txt]')
        print('-s : serial number of device to calibrate.')
        print('-g : show graph of norm values - original values in blue and corrected in green.')
        print
        print('If -i option is given, calibration is done using previosly saved files')
        print('Otherwise, an interactive process is followed.')
        sys.exit(1)

    try:
        accel_file = None
        gyro_file = None
        serial_no = ''
        show_graph = '-g' in sys.argv
        for idx in range(len(sys.argv)):
            if sys.argv[idx] == '-i':
                accel_file = sys.argv[idx+1]
                if len(sys.argv) > idx+2 and not sys.argv[idx+2].startswith('-'):
                    gyro_file = sys.argv[idx+2]
            if sys.argv[idx] == '-s':
                serial_no = sys.argv[idx+1]

        print('waiting for realsense device...')

        dev = wait_for_rs_device(serial_no)

        product_line = dev.get_info(rs.camera_info.product_line)

        if product_line == 'L500':
            print('checking minimum firmware requirement ...')
            fw_version = dev.get_info(rs.camera_info.firmware_version)
            if fw_version < L515_FW_VER_REQUIRED:
                raise Exception('L515 requires firmware ' + L515_FW_VER_REQUIRED + " or later to support IMU calibration. Please upgrade firmware and try again.")
            else:
                print('  firmware ' + fw_version + ' passed check.')

        buckets = [[0, -g,  0], [ g,  0, 0],
                [0,  g,  0], [-g,  0, 0],
                [0,  0, -g], [ 0,  0, g]]

        # all D400 and L500 cameras with IMU equipped with a mounting screw at the bottom of the device
        # when device is in normal use position upright facing out, mount screw is pointing down, aligned with positive Y direction in depth coordinate system
        # IMU output on each of these devices is transformed into the depth coordinate system, i.e.,
        # looking from back of the camera towards front, the positive x-axis points to the right, the positive y-axis points down, and the positive z-axis points forward.
        # output of motion data is consistent with convention that positive direction aligned with gravity leads to -1g and opposite direction leads to +1g, for example,
        # positive z_aixs points forward away from front glass of the device,
        #  1) if place the device flat on a table, facing up, positive z-axis points up, z-axis acceleration is around +1g
        #  2) facing down, positive z-axis points down, z-axis accleration would be around -1g
        #
        buckets_labels = ["Mounting screw pointing down, device facing out", "Mounting screw pointing left, device facing out", "Mounting screw pointing up, device facing out", "Mounting screw pointing right, device facing out", "Viewing direction facing down", "Viewing direction facing up"]

        gyro_bais = np.zeros(3, np.float32)
        old_settings = None
        if accel_file:
            if gyro_file:
                #compute gyro bais

                #assume the first 4 seconds the device is still
                gyro = np.loadtxt(gyro_file, delimiter=",")
                gyro = gyro[gyro[:, 0] < gyro[0, 0]+4000, :]

                gyro_bais = np.mean(gyro[:, 1:], axis=0)
                print(gyro_bais)

            #compute accel intrinsic parameters
            max_norm = np.linalg.norm(np.array([0.5, 0.5, 0.5]))

            measurements = [[], [], [], [], [], []]
            import csv
            with open(accel_file, 'r') as csvfile:
                reader = csv.reader(csvfile)
                rnum = 0
                for row in reader:
                    M = np.array([float(row[1]), float(row[2]), float(row[3])])
                    is_ok = False
                    for i in range(0, len(buckets)):
                        if np.linalg.norm(M - buckets[i]) < max_norm:
                            is_ok = True
                            measurements[i].append(M)
                    rnum += 1
            print('read %d rows.' % rnum)
        else:
            print('Start interactive mode:')
            if os.name == 'posix':
                old_settings = termios.tcgetattr(sys.stdin)
                tty.setcbreak(sys.stdin.fileno())

            imu = imu_wrapper()
            if not imu.enable_imu_device(serial_no):
                print('Failed to enable device.')
                return -1
            measurements, gyro = imu.get_measurements(buckets, buckets_labels)
            con_mm = np.concatenate(measurements)
            if os.name == 'posix':
                termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)

            header = input('\nWould you like to save the raw data? Enter footer for saving files (accel_<footer>.txt and gyro_<footer>.txt)\nEnter nothing to not save raw data to disk. >')
            print('\n')
            if header:
                accel_file = 'accel_%s.txt' % header
                gyro_file = 'gyro_%s.txt' % header
                print('Writing files:\n%s\n%s' % (accel_file, gyro_file))
                np.savetxt(accel_file, con_mm, delimiter=',', fmt='%s')
                np.savetxt(gyro_file, gyro, delimiter=',', fmt='%s')
            else:
                print('Not writing to files.')
            # remove times from measurements:
            measurements = [mm[:,1:] for mm in measurements]

            gyro_bais = np.mean(gyro[:, 1:], axis=0)
            print(gyro_bais)

        mlen = np.array([len(meas) for meas in measurements])
        print(mlen)
        print('using %d measurements.' % mlen.sum())

        nrows = mlen.sum()
        w = np.zeros([nrows, 4])
        Y = np.zeros([nrows, 3])
        row = 0
        for i in range(0, len(buckets)):
            for m in measurements[i]:
                w[row, 0] = m[0]
                w[row, 1] = m[1]
                w[row, 2] = m[2]
                w[row, 3] = -1
                Y[row, 0] = buckets[i][0]
                Y[row, 1] = buckets[i][1]
                Y[row, 2] = buckets[i][2]
                row += 1
        np_version = [int(x) for x in np.version.version.split('.')]
        rcond_val = None if (np_version[1] >= 14 or np_version[0] > 1) else -1
        X, residuals, rank, singular = np.linalg.lstsq(w, Y, rcond=rcond_val)

        print(X)
        print("residuals:", residuals)
        print("rank:", rank)
        print("singular:", singular)
        check_X(X, w[:,:3], show_graph)

        calibration = {}

        if product_line == 'L500':
            calibration["device_type"] = "L515"
        else:
            calibration["device_type"] = "D435i"

        calibration["imus"] = list()
        calibration["imus"].append({})
        calibration["imus"][0]["accelerometer"] = {}
        calibration["imus"][0]["accelerometer"]["scale_and_alignment"] = X.flatten()[:9].tolist()
        calibration["imus"][0]["accelerometer"]["bias"] = X.flatten()[9:].tolist()
        calibration["imus"][0]["gyroscope"] = {}
        calibration["imus"][0]["gyroscope"]["scale_and_alignment"] = np.eye(3).flatten().tolist()
        calibration["imus"][0]["gyroscope"]["bias"] = gyro_bais.tolist()
        json_data = json.dumps(calibration, indent=4, sort_keys=True)

        directory = os.path.dirname(accel_file) if accel_file else '.'

        with open(os.path.join(directory,"calibration.json"), 'w') as outfile:
            outfile.write(json_data)

        #concatinate the two 12 element arrays and save
        intrinsic_buffer = np.zeros([6,4])

        intrinsic_buffer[:3,:4] = X.T
        intrinsic_buffer[3:,:3] = np.eye(3)
        intrinsic_buffer[3:,3] = gyro_bais

        # intrinsic_buffer = ((np.array(range(24),np.float32)+1)/10).reshape([6,4])

        imu_calib_table = get_IMU_Calib_Table(intrinsic_buffer, product_line)

        with open(os.path.join(directory,"calibration.bin"), 'wb') as outfile:
            outfile.write(imu_calib_table.astype('f').tostring())

        is_write = input('Would you like to write the results to the camera? (Y/N)')
        is_write = 'Y' in is_write.upper()
        if is_write:
            print('Writing calibration to device.')

            if product_line == 'L500':
                l500_send_command(dev, WRITE_TABLE, 0, 0, 0, 0, imu_calib_table)
            else:
                calibration_table = get_calibration_table(imu_calib_table)
                eeprom = get_eeprom(calibration_table)
                write_eeprom_to_camera(eeprom, serial_no)

            print('Done.')
        else:
            print('Abort writing to device')

    except Exception as e:
        print ('\nError: %s' % e)
    finally:
        if os.name == 'posix' and old_settings is not None:
            termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)

    """
    wtw = dot(transpose(w),w)
    wtwi = np.linalg.inv(wtw)
    print(wtwi)
    X = dot(wtwi, Y)
    print(X)
    """
if __name__ == '__main__':
    main()

 

 

运行效果:

(wind_2021) PS E:\LibRealsense\librealsense-master\tools\rs-imu-calibration> python rs-imu-calibration.py
waiting for realsense device...
  Device PID:  0B3A
  Device name:  Intel RealSense D435I
  Serial number:  843112073831
  Product Line:  D400
  Firmware version:  05.10.13.00
Start interactive mode:
FOUND ACCEL with fps=63
FOUND GYRO with fps=200
-------------------------
*** Press ESC to Quit ***
-------------------------

Align to direction:  [ 0. -1.  0.]   Mounting screw pointing down, device facing out
 Status.collect_data[...................]m

Direction data collected.
Align to direction:  [1. 0. 0.]   Mounting screw pointing left, device facing out
   Status.rotate:           [ 1.0182  0.9982 -0.0566]:                 [False False  True]Traceback (most recent call last):

:           [ 1.0172  0.9982 -0.0566]:                 [False False  True]
(wind_2021) PS E:\LibRealsense\librealsense-master\tools\rs-imu-calibration>

 

 

代码来自:librealsense-master\tools\rs-imu-calibration\rs-imu-calibration.py

#######################

标签:d435i,gyro,self,realsense,print,imu,np,data
From: https://www.cnblogs.com/herd/p/17357445.html

相关文章

  • python 串口读取IMU
    #coding:UTF-8#Version:V1.0.1importserialACCData=[0.0]*8GYROData=[0.0]*8AngleData=[0.0]*8FrameState=0#WhatisthestateofthejudgmentBytenum=0#ReadthenumberofdigitsinthisparagraphCheckSum=0#Sumcheckbita=[0.0......
  • realsense d435i获取相机姿态数据
    获取RealSenseD435i相机的姿态数据:安装RealSenseSDK2.0:您可以从官方网站(https://www.realsense.com/)下载并操使用RealSenseSDK,也可以现有帐RealSenseSDK.html连接相机:将RealSenseD435i相机连接到计算机,并确保相机的USB接口已正确连接。打开RealSenseViewer:启动......
  • PAT Advanced 1007. Maximum Subsequence Sum
    PATAdvanced1007.MaximumSubsequenceSum1.ProblemDescription:Givenasequenceof\(K\)integers{\(N_1,N_2,...,N_K\)}.Acontinuoussubsequenceisdefinedtobe{\(N_i,N_{i+1},...,N_j\)}where\(1≤i≤j≤K\).TheMaximumSubsequencei......
  • 【论文】Range-Focused Fusion of Camera-IMU-UWB for Accurate and Drift-Reduced Lo
    ##Abstract![请添加图片描述](https://img-blog.csdnimg.cn/50c3a8cc38904318b361ef50ea49b889.png)##I.INTRODUCTION为什么需要添加UWB?因为传统的VIO会由于传感器的噪声和计算误差产生累计偏移。所以需要GPS或者是UWB来做修正。UWB则是专注于室内。以前对于UWB的使用一般都是......
  • 基于simulink的MPPT控制器性能仿真,MPPT使用S函数实现
    1.算法仿真效果matlab2022a仿真结果如下:2.算法涉及理论知识概要MPPT控制器的全称是“最大功率点跟踪”(MaximumPowerPointTracking)太阳能控制器,是传统太阳能充放电控制器的升级换代产品。MPPT控制器能够实时侦测太阳能板的发电电压,并追踪最高电压电流值(VI),使系统以最大功率......
  • 基于simulink的MPPT控制器性能仿真,MPPT使用S函数实现
    1.算法仿真效果matlab2022a仿真结果如下:    2.算法涉及理论知识概要         MPPT控制器的全称是“最大功率点跟踪”(MaximumPowerPointTracking)太阳能控制器,是传统太阳能充放电控制器的升级换代产品。MPPT控制器能够实时侦测太阳能板的发电电压,并追踪......
  • 基于EKF扩展卡尔曼滤波算法的永磁同步电机PMSM无传感器矢量控制Simulink仿真模型。
    基于EKF扩展卡尔曼滤波算法的永磁同步电机PMSM无传感器矢量控制Simulink仿真模型。1.依据PMSM的数学模型搭建电机模型2.双闭环dq解耦控制,转速外环,转矩内环3.EKF算法对电机的转子电角度和机械转速进行估算ID:2465668485383219......
  • simulink模块汇总梳理 , 智能座舱域在AUTOSAR 框架中应用层的开发依赖于simulink建模,
    simulink模块汇总梳理,智能座舱域在AUTOSAR框架中应用层的开发依赖于simulink建模,通过simulink模型设计加上C代码生成来完成繁杂的应用层开发。因此simulink计算组件的充分了解必不可少。本商品梳理了近乎全部的simulink模块,帮助您短时间对未知模块进行检索及了解,倘若您需要模......
  • 自动驾驶多传感器联合标定系列之IMU到车体坐标系的标定工程 , 本商品在已知GNSS/GPS到
    自动驾驶多传感器联合标定系列之IMU到车体坐标系的标定工程,本商品在已知GNSS/GPS到车体坐标系的外参前提下,根据GNSS/GPS的定位信息与IMU信息完成IMU到GNSS/GPS的外参标定,并进一步获得IMU到车体坐标系的外参标定。本商品提供两种标定模式:车辆直线运动及自由运动,这两种模式下的注......
  • carsim+simulink联合仿真实现变道
    carsim+simulink联合仿真实现变道包含路径规划算法+mpc轨迹跟踪算法可选simulink版本和c++版本算法可以适用于弯道道路,弯道车道保持,弯道变道carsim内规划轨迹可视化Carsim2020.0Matlab2017bYID:67150635596250714......