master
François Boulogne 2 months ago
commit b41eb58ca3

132
.gitignore vendored

@ -0,0 +1,132 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
.coverage.*
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
docs/autosummary/
# Jupyter Notebook
.ipynb_checkpoints
*.ipynb
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; see https://github.com/pyenv/pyenv/issues/528
.python-version
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and https://pdm.fming.dev
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
env.d/
venv.d/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# VSCode settings
.vscode/

@ -0,0 +1,4 @@
labhw
=====

@ -0,0 +1,8 @@
#!/usr/bin/env python
import labhw
devices = labhw.list_USB_devices()
for k, v in devices.items():
print(k, v)

@ -0,0 +1,416 @@
import serial
import time
import thorlabs_apt_protocol as apt
import numpy as np
import pandas as pd
class thorlab_lin_stage_LTS_150:
def __init__(self, portn="/dev/ttyUSB0", baud=115200, dest=0x50, source=1, chan_ident=1):
self.portn=portn
self.baud=baud
self.source=source
self.dest=dest
self.chan_ident=chan_ident
def initialize(self):
port = serial.Serial(self.portn, self.baud, rtscts=True, timeout=0.1)
port.rts = True
port.reset_input_buffer()
port.reset_output_buffer()
port.rts = False
# This message is sent on start up to notify the controller of the
# source and destination addresses. A client application must send
# this message as part of its initialization process.
port.write(apt.hw_no_flash_programming(self.dest,self.source))
self.port=port
print('Device Initialized')
return port
#%%
def prnt(self, msg_list):
unpacker = apt.Unpacker(self.port)
all_msg=[]
for msg in unpacker:
all_msg.append(msg)
print(msg)
all_msg=np.array(all_msg, dtype=object)
print(all_msg, all_msg.shape)
all_msg=pd.DataFrame(all_msg, columns=msg_list)
return all_msg
#%%
#(msg='mot_move_completed', msgid=1124, dest=1, source=80, chan_ident=1, position=14279384, velocity=0, forward_limit_switch=False, reverse_limit_switch=False, moving_forward=True, moving_reverse=True, jogging_forward=False, jogging_reverse=False, motor_connected=False, homing=True, homed=False, tracking=False, interlock=False, settled=False, motion_error=False, motor_current_limit_reached=False, channel_enabled=False)
def msgp_jog(self):
msg_list=['msg', 'msgid', 'dest', 'source', 'chan_ident', 'position', 'velocity',
'forward_limit_switch', 'reverse_limit_switch', 'moving_forward',
'moving_reverse', 'jogging_forward', 'jogging_reverse',
'motor_connected', 'homing', 'homed', 'tracking', 'interlock',
'settled', 'motion_error', 'motor_current_limit_reached', 'channel_enabled',
'x1', 'x2', 'x3', 'x4', 'x5', 'x6', 'x7', 'x8', 'x9', 'x10', 'x11', 'x12', 'x13', 'x14']
# for jog
try:
p_jog=self.prnt(msg_list)
return p_jog
except:
print('Error reading fun: msgp_jog')
raise
#%%
# hw_get_info(msg='hw_get_info', msgid=6, dest=1, source=80, serial_number=45218804, model_number=b'LTS150\x00\x00', type=16, firmware_version=[3, 0, 10], hw_version=3, mod_state=0, nchs=1)
#Sent to request hardware information from the controller
def msgp_hardware(self):
msg_list2=['msg', 'msgid', 'dest', 'source', 'serial_number',
'model_number', 'type', 'firmware_version', 'hw_version', 'mod_state', 'nchs']
# for hardwareinfo
self.port.write(apt.hw_req_info(self.dest, self.source))
try:
p_hardware=self.prnt(msg_list2)
return p_hardware
except:
print('Error setting fun: msgp_hardware')
raise
#%% MGMSG_MOT_SET_VELPARAMS
#[mot_get_velparams(msg='mot_get_velparams', msgid=1045, dest=1, source=80, chan_ident=1, min_velocity=0, acceleration=90121, max_velocity=439804651)]
def msgp_velparam(self):
msg_list3=['msg', 'msgid', 'dest', 'source', 'chan_ident',
'min_velocity', 'acceleration', 'max_velocity']
# for hardwareinfo
self.port.write(apt.mot_req_velparams(self.dest,self.source, self.chan_ident))
try:
p_velparam=self.prnt(msg_list3)
return p_velparam
except:
print('Error reading fun: msgs_velparam')
raise
return 0
# Used to set the trapezoidal velocity parameters for the specified
# motor channel. For DC servo controllers, the velocity is set in
# encoder counts/sec and acceleration is set in encoder counts/sec/sec.
# For stepper motor controllers the velocity is set in microsteps/sec
# and acceleration is set in microsteps/sec/sec.
def msgs_velparam(self, min_velocity, acceleration, max_velocity):
# for hardwareinfo
try:
self.port.write(apt.mot_set_velparams(self.dest, self.source,
self.chan_ident,
min_velocity=min_velocity,
acceleration=acceleration,
max_velocity=max_velocity))
print('Done')
except:
print('Error setting fun: msgs_velparam')
raise
return 0
#%% MGMSG_MOT_SET_JOGPARAMS
# mot_get_jogparams(msg='mot_get_jogparams', msgid=1048, dest=1, source=80, chan_ident=1, jog_mode=2, step_size=2048000, min_velocity=0, acceleration=45061, max_velocity=219902326, stop_mode=2)
def msgp_jogparam(self):
msg_list4=[
'msg', 'msgid', 'dest', 'source', 'chan_ident', 'jog_mode', 'step_size',
'min_velocity', 'acceleration', 'max_velocity', 'stop_mode']
# for hardwareinfo
self.port.write(apt.mot_req_jogparams(self.dest,self.source, self.chan_ident))
try:
p_jogparam=self.prnt(msg_list4)
return p_jogparam
except:
print('Error reading fun: msgp_jogparam')
raise
return 0
# Used to set the velocity jog parameters for the specified motor
# channel, For DC servo controllers, values set in encoder counts.
# For stepper motor controllers the values is set in microsteps
def msgs_jogparam(self, step_size, min_velocity, acceleration,
max_velocity,jog_mode=2, stop_mode=2):
# for hardwareinfo
try:
self.port.write(apt.mot_set_jogparams(
self.dest, self.source,
self.chan_ident,
jog_mode=jog_mode,
step_size=step_size,
stop_mode=stop_mode,
min_velocity=min_velocity,
acceleration=acceleration,
max_velocity=max_velocity))
print('Done')
except:
print('Error setting fun: msgs_jogparam')
raise
return 0
#%% MGMSG_MOT_SET_GENMOVEPARAMS
# [mot_get_genmoveparams(msg='mot_get_genmoveparams', msgid=1084, dest=1, source=80, chan_ident=1, backlash_distance=20480)]
def msgp_backlash(self):
msg_list5=['msg', 'msgid', 'dest', 'source', 'chan_ident', 'backlash_distance']
# for hardwareinfo
self.port.write(apt.mot_req_genmoveparams(self.dest,self.source, self.chan_ident))
try:
p_backlash=self.prnt(msg_list5)
return p_backlash
except:
print('Error reading fun: msgp_backlash')
raise
return 0
# Used to set the general move parameters for the specified motor
#channel. At this time this refers specifically to the backlash settings
def msgs_backlash(self, backlash_distance):
# for hardwareinfo
try:
self.port.write(apt.mot_set_genmoveparams(
self.dest, self.source,
self.chan_ident,
backlash_distance=backlash_distance))
print('Done backlash_distance')
except:
print('Error setting fun: msgs_backlash')
raise
return 0
#%% MGMSG_MOT_SET_MOVERELPARAMS
# [mot_get_moverelparams(msg='mot_get_moverelparams', msgid=1095, dest=1, source=80, chan_ident=1, relative_distance=0)]
def msgp_relmov(self):
msg_list7=['msg', 'msgid', 'dest', 'source', 'chan_ident', 'relative_distance']
# for hardwareinfo
self.port.write(apt.mot_req_moverelparams(self.dest,self.source, self.chan_ident))
try:
p_relmov=self.prnt(msg_list7)
return p_relmov
except:
print('Error reading fun: msgp_relmov')
raise
return 0
# Used to set the general move parameters for the specified motor
#channel. At this time this refers specifically to the backlash settings
def msgs_relmov(self, relative_distance= 0):
# for hardwareinfo
try:
self.port.write(apt.mot_set_moverelparams(
self.dest, self.source,
self.chan_ident,
relative_distance=relative_distance))
print('Done relative_distance')
except:
print('Error setting fun: msgs_relmov')
raise
return 0
#%% MGMSG_MOT_SET_MOVEABSPARAMS
def msgp_absmov(self):
msg_list8=['msg', 'msgid', 'dest', 'source', 'chan_ident', 'absolute_position']
# for hardwareinfo
self.port.write(apt.mot_req_moveabsparams(self.dest,self.source, self.chan_ident))
try:
p_absmov=self.prnt(msg_list8)
return p_absmov
except:
print('Error reading fun: msgp_absmov')
raise
return 0
def msgs_absmov(self, absolute_position= 0):
# for hardwareinfo
try:
self.port.write(apt.mot_set_moveabsparams(
self.dest, self.source,
self.chan_ident,
absolute_position=absolute_position))
print('Done absolute_position')
except:
print('Error setting fun: msgs_absmov')
raise
return 0
#%% MGMSG_MOT_SET_HOMEPARAMS
def msgp_homeparam(self):
msg_list9=['msg', 'msgid', 'dest', 'source','chan_ident', 'home_dir', 'limit_switch', 'home_velocity', 'offset_distance']
# for hardwareinfo
self.port.write(apt.mot_req_homeparams(self.dest,self.source, self.chan_ident))
try:
p_homeparam=self.prnt(msg_list9)
return p_homeparam
except:
print('Error reading fun: msgp_homeparam')
raise
return 0
def msgs_homeparam(self, home_velocity, offset_distance,
home_dir=2, limit_switch=1, ):
try:
self.port.write(apt.mot_set_homeparams(
self.dest, self.source,
self.chan_ident,
home_dir=home_dir,
limit_switch=limit_switch,
home_velocity=home_velocity,
offset_distance=offset_distance))
print('Done homeparam')
except:
print('Error setting fun: msgs_homeparam')
raise
return 0
#%% MGMSG_MOT_SET_LIMSWITCHPARAMS
# mot_get_limswitchparams(msg='mot_get_limswitchparams', msgid=1061, dest=1, source=80, chan_ident=1, cw_hardlimit=2, ccw_hardlimit=2, cw_softlimit=1228800, ccw_softlimit=409600, soft_limit_mode=0)
def msgp_limitsw(self):
msg_list10=['msg', 'msgid', 'dest', 'source', 'chan_ident','cw_hardlimit', 'ccw_hardlimit', 'cw_softlimit', 'ccw_softlimit', 'soft_limit_mode']
self.port.write(apt.mot_req_limswitchparams(self.dest,self.source, self.chan_ident))
try:
p_limitsw=self.prnt(msg_list10)
return p_limitsw
except:
print('Error reading fun: msgp_limitsw')
raise
return 0
def msgs_limitsw(self, cw_softlimit, ccw_softlimit, cw_hardlimit=2, ccw_hardlimit=2, soft_limit_mode=0):
try:
self.port.write(apt.mot_set_limswitchparams(
self.dest, self.source,
self.chan_ident,
cw_hardlimit=cw_hardlimit,
ccw_hardlimit=ccw_hardlimit,
cw_softlimit=cw_softlimit,
ccw_softlimit=ccw_softlimit,
soft_limit_mode=soft_limit_mode))
print('Done limitsw')
except:
print('Error setting fun: msgs_limitsw')
raise
return 0
#%%
def update(self):
p_hardware=self.msgp_hardware()
self.var_serial_number=p_hardware['serial_number']
self.var_model_number=p_hardware['model_number']
self.var_type=p_hardware['type']
self.var_firmware_version=p_hardware['firmware_version']
self.var_hw_version=p_hardware['hw_version']
self.var_no_channels=p_hardware['nchs']
p_velparam=self.msgp_velparam()
self.var_min_velocity=p_velparam['min_velocity']
self.var_acceleration=p_velparam['acceleration']
self.var_max_velocity=p_velparam['max_velocity']
p_jogparam=self.msgp_jogparam()
self.var_jog_mode=p_jogparam['jog_mode']
self.var_jog_step_size=p_jogparam['step_size']
self.var_jog_min_velocity=p_jogparam['min_velocity']
self.var_jog_acceleration=p_jogparam['acceleration']
self.var_jog_max_velocity=p_jogparam['max_velocity']
self.var_jog_stop_mode=p_jogparam['stop_mode']
p_backlash=self.msgp_backlash()
self.var_backlash_distance=p_backlash['backlash_distance']
p_relmov=self.msgp_relmov()
self.var_relative_distance=p_relmov['relative_distance']
p_absmov=self.msgp_absmov()
self.var_absolute_position=p_absmov['absolute_position']
p_homeparam=self.msgp_homeparam()
self.var_home_direction=p_homeparam['home_dir']
self.var_home_limit_switch=p_homeparam['limit_switch']
self.var_home_velocity=p_homeparam['home_velocity']
self.var_home_offset_distance=p_homeparam['offset_distance']
#self.jog_forward()
#time.sleep(10)
self.move_position(10)
time.sleep(5)
self.move_position(0)
time.sleep(2)
p_jog = self.msgp_jog()
time.sleep(2)
p_jogparam = self.msgp_jogparam()
self.var_jog_step_size = p_jogparam['step_size']
self.var_jog_position = p_jog['position']
self.var_jog_velocity = p_jog['velocity']
self.var_jog_homed = p_jog['homed']
print('Data Updated >>>>>>>>>>>>>>>>>>>')
def work_back(self):
self.port.write(apt.mot_move_home(self.dest, self.source, self.chan_ident))
self.port.write(apt.mot_move_absolute(self.dest, self.source, self.chan_ident))
def set_speed(self,
min_velocity=1_000_000,
acceleration=1_000_000,
max_velocity=1_000_000_000):
self.port.write(apt.mot_set_velparams(self.dest,
self.source,
self.chan_ident,
min_velocity=min_velocity,
acceleration=acceleration,
max_velocity=max_velocity,))
def move_position(self, relative_distance=1): # 0 to 67
self.relative_distance=int(float(relative_distance)*1_000_000/150*61.6)
self.msgs_relmov(relative_distance=self.relative_distance)
self.port.write(apt.mot_move_absolute(self.dest,self.source, self.chan_ident))
def jog_forward(self):
"""Jog toward end direction"""
self.port.write(apt.mot_move_jog(self.dest,self.source, self.chan_ident, direction=1))
def jog_backward(self):
"""Jog toward home direction."""
self.port.write(apt.mot_move_jog(self.dest,self.source, self.chan_ident, direction=2))
def stop_hard(self):
self.port.write(apt.mot_move_stop(self.dest, self.source, self.chan_ident, stop_mode=1))
def stop_soft(self):
self.port.write(apt.mot_move_stop(self.dest, self.source, self.chan_ident, stop_mode=2))
def disconnect(self):
self.port.close()
print('Device disconnected')
def move_home(self):
"""Move back to home."""
self.move_position(0)

@ -0,0 +1,72 @@
import subprocess
from pathlib import Path
import ast
import re
import serial.tools.list_ports
def list_USB_devices():
"""
"""
full_pattern = re.compile('[^a-zA-Z0-9\\\/]|_')
ports = serial.tools.list_ports.comports()
result = {}
for p in ports:
if p.manufacturer:
key = p.manufacturer
if p.serial_number:
key += '_' + p.serial_number
key = re.sub(full_pattern, '_', key)
result[key] = p.device
return result
def list_USB_devices_old():
result = {}
path = Path('/sys/bus/usb/devices/')
for syspath in path.glob('usb*'):
for devfile in syspath.glob('**/dev'):
if devfile.is_file():
devpath = devfile.parent
# Get devnames
proc = subprocess.Popen(['udevadm', 'info', '-q', 'name', '-p', devpath],
stdout=subprocess.PIPE)
output = proc.stdout.read()
devname = output.decode('utf-8')
if not devname.startswith('bus') and not devname.startswith('input'):
# get properties
proc = subprocess.Popen(['udevadm', 'info', '-q', 'property', '--export', '-p', devpath],
stdout=subprocess.PIPE)
output = proc.stdout.read()
output = output.decode('utf-8')
# Not nice, but convert the output into dict
output = output.replace("='", "\':\'")
output = re.sub('^', "\'", output, flags=re.MULTILINE)
output = '{' + output[:-1].replace('\n', ', ') + '}'
properties = ast.literal_eval(output)
if 'ID_SERIAL' in properties:
p = str(path.joinpath('/dev', devname))
p = p.replace('\n', '')
result[properties['ID_SERIAL']] = p
return result
def update_USB_devices(devices):
"""
Find paths for devices.
For each device passed by the user, use the automatic
detection, and bind the path.
devices: dictionnary
"""
res = list_USB_devices()
for device in devices:
name = devices[device]['name']
if name in res:
path = res[name]
devices[device]['path'] = path
return devices

@ -0,0 +1,3 @@
from .LTS import *
from .USBdev import *
from .interactive import *

@ -0,0 +1,62 @@
import ast
import inspect
def get_functions_name_in_main(source_code):
"""
Return the functions defined in the main of a source code.
source_code: content to analyze
"""
tree = ast.parse(source_code)
functions_in_main = []
class MainVisitor(ast.NodeVisitor):
def visit_If(self, node):
# Check if the condition is: if __name__ == '__main__':
if isinstance(node.test, ast.Compare) and \
isinstance(node.test.left, ast.Name) and node.test.left.id == "__name__" and \
isinstance(node.test.ops[0], ast.Eq) and \
isinstance(node.test.comparators[0], ast.Constant) and node.test.comparators[0].value == "__main__":
# Visit the body of the if statement
for n in node.body:
if isinstance(n, ast.FunctionDef):
functions_in_main.append(n.name)
elif isinstance(n, ast.AsyncFunctionDef):
functions_in_main.append(n.name)
else:
self.generic_visit(n)
else:
self.generic_visit(node)
visitor = MainVisitor()
visitor.visit(tree)
return functions_in_main
def help(filepath, globals_values):
"""
Print helper functions for interactive mode.
filepath: path of the file to analyze, __file__
globals_values: call globals()
"""
with open(filepath, "r") as file:
source_code = file.read()
functions = get_functions_name_in_main(source_code)
print('~' * 20)
for func_name in functions:
# Utiliser inspect pour obtenir la fonction elle-même
func = globals_values.get(func_name)
if func:
# Obtenir la signature de la fonction
signature = inspect.signature(func)
# Obtenir la docstring de la fonction
docstring = inspect.getdoc(func)
# Extraire la première ligne de la docstring
first_line_of_docstring = docstring.split('\n')[0] if docstring else "No docstring available"
# Afficher le nom de la fonction, sa signature et la première ligne de la docstring
print(f"{func_name}{signature}")
print(f" {first_line_of_docstring}")
print('~' * 20)

@ -0,0 +1,47 @@
import serial
import logging
import time
def write_and_check(socket, command, expected_reply=None, max_retry=3):
"""
Write a command on arduino and check for the reply.
socket -- serial socket for arduino
command -- str
expected_reply -- str. If None, no check performed.
"""
command += '\n'
logging.debug('clear buffer...')
_ = socket.read_all()
socket.write(command.encode())
logging.debug(f'check for reply of command {command}')
reply = socket.read_until()
logging.debug(f'reply is {reply}')
if expected_reply:
attempt = 0
expected_reply += '\r\n'
while attempt < max_retry and reply != expected_reply.encode():
logging.debug('Not the expected reply, wait...')
time.sleep(.2)
logging.debug('clear buffer...')
_ = socket.read_all()
logging.debug('retry...')
socket.write(command.encode())
#time.sleep(.2)
reply = socket.read_until()
logging.debug(f'{command} reply {reply}')
attempt += 1
if attempt == max_retry:
raise RuntimeError
def write_and_read(socket, command):
command += '\n'
socket.write(command.encode())
logging.debug(f'{command} sent')
time.sleep(.2)
reply = socket.read_until()
reply = reply.decode()
logging.debug(f'{command} reply {reply}')
return reply

@ -0,0 +1,5 @@
setuptools
pyserial
thorlabs_apt_protocol
numpy
pandas

@ -0,0 +1,38 @@
#!/usr/bin/env python
from setuptools import setup, find_packages
from os import path
# Function to read the requirements from the requirements.txt file
def parse_requirements(filename):
with open(filename, 'r') as file:
return file.read().splitlines()
# Get the long description from the README file
here = path.abspath(path.dirname(__file__))
with open(path.join(here, 'README.md'), encoding='utf-8') as f:
long_description = f.read()
setup(
name = 'labhw',
version = '0.1.1',
author = "Francois Boulogne",
license = "BSD",
author_email = "devel@sciunto.org",
description = "Manage my lab hardware",
long_description=long_description,
long_description_content_type='text/markdown',
scripts = ['bin/listUSBdevice.py',
],
# Automatically find packages in the current directory
packages=find_packages(), # Required
# Include additional files specified in MANIFEST.in
include_package_data=True, # Optional
# Define your dependencies in a separate file
install_requires=parse_requirements('requirements.txt'), # Optional
)
Loading…
Cancel
Save