WAPT Setuphelpers Usage

setuphelpers.all_files(rootdir, pattern=None)[source]

Recursively return all files from rootdir and sub directories matching the (dos style) pattern (example: *.exe)

setuphelpers.application_data()[source]
setuphelpers.autoremove_apt()[source]
setuphelpers.autoremove_yum()[source]
setuphelpers.copytree2(src, dst, ignore=None, onreplace=<function default_skip>, oncopy=<function default_oncopy>, enable_replace_at_reboot=True, follow_symlinks=False)[source]

Copy src directory to dst directory. dst is created if it doesn’t exists src can be relative to installation temporary dir

oncopy is called for each file copy. if False is returned, copy is skipped onreplace is called when a file will be overwritten.

Args:

src (str): path to source directory (absolute path or relative to package extraction tempdir) dst (str): path to target directory (created if not present) ignore (func) : callback func(root_dir,filenames) which returns names to ignore onreplace (func) : callback func(src,dst):boolean called when a file will be replaced to decide what to do.

default is to not replace if target exists. can be default_overwrite or default_overwrite_older or custom function.

oncopy (func)callback func(msg,src,dst) called when a file is copied.

default is to log in debug level the operation

enable_replace_at_reboot (boolean): if True, files which are locked will be scheduled for replace at next reboot

Returns:

Raises:

>>> copytree2(r'c:\tranquilit\wapt\tests',r'c:\tranquilit\wapt\tests2')
>>> isdir(r'c:\tranquilit\wapt\tests2')
True
>>> remove_tree(r'c:\tranquilit\wapt\tests2')
>>> isdir(r'c:\tranquilit\wapt\tests2')
False
setuphelpers.currentdate()[source]

date as string YYYYMMDD

>>> currentdate()
'20161102'
setuphelpers.currentdatetime()[source]

date/time as YYYYMMDD-hhmmss

>>> currentdatetime()
'20161102-193600'
setuphelpers.dateof(adatetime)[source]
setuphelpers.datetime2isodate(adatetime=None)[source]
setuphelpers.default_gateway()[source]

Returns default ipv4 current gateway

setuphelpers.default_oncopy(msg, src, dst)[source]
setuphelpers.default_overwrite(src, dst)[source]
setuphelpers.default_overwrite_older(src, dst)[source]
setuphelpers.default_skip(src, dst)[source]
setuphelpers.dir_is_empty(path)[source]

Check if a directory is empty

setuphelpers.dmi_info()[source]
setuphelpers.ensure_dir(filename)[source]

Be sure the directory of filename exists on disk. Create it if not

The intermediate directories are created either.

Args:

filename (str): path to a future file for which to create directory.

Returns:

None

setuphelpers.ensure_list(csv_or_list, ignore_empty_args=True, allow_none=False)[source]

if argument is not a list, return a list from a csv string

Args:

csv_or_list (list or str): ignore_empty_args (bool): if True, empty string found in csv are not appended to the list. allow_none (bool): if True, if csv_or_list is None, return None, else return empty list/

Returns:

list

setuphelpers.ensure_unicode(data)[source]

Return a unicode string from data object It is sometimes difficult to know in advance what we will get from command line application output.

This is to ensure we get a (not always accurate) representation of the data mainly for logging purpose.

Args:

data: either str or unicode or object having a __str__ or WindowsError or Exception

Returns:

unicode: unicode string representing the data

>>> ensure_unicode(str('éé'))
u'éé'
>>> ensure_unicode(u'éé')
u'éé'
>>> ensure_unicode(Exception("test"))
u'Exception: test'
>>> ensure_unicode(Exception())
u'Exception: '
setuphelpers.error(reason)[source]

Raise a WAPT fatal error

setuphelpers.file_is_locked(path, timeout=5)[source]

Check if a file is locked. waits timout seconds for the release

setuphelpers.filecopyto(filename, target)[source]

Copy file from absolute or package temporary directory to target directory

If file is dll or exe, logs the original and new version.

Args:
filename (str): absolute path to file to copy,

or relative path to temporary package install content directory.

target (str) : absolute path to target directory where to copy file.

target is either a full filename or a directory name if filename is .exe or .dll, logger prints version numbers

>>> if not os.path.isfile('c:/tmp/fc.test'):
...     with open('c:/tmp/fc.test','wb') as f:
...         f.write('test')
>>> if not os.path.isdir('c:/tmp/target'):
...    os.mkdir('c:/tmp/target')
>>> if os.path.isfile('c:/tmp/target/fc.test'):
...    os.unlink('c:/tmp/target/fc.test')
>>> filecopyto('c:/tmp/fc.test','c:/tmp/target')
>>> os.path.isfile('c:/tmp/target/fc.test')
True
setuphelpers.fileisodate(filename)[source]

Returns last update date time from filename in local time

setuphelpers.find_all_files(rootdir, include_patterns=None, exclude_patterns=None, include_dirs=None, exclude_dirs=None, excludes_full=None)[source]

Generator which recursively find all files from rootdir and sub directories matching the (dos style) patterns (example: *.exe)

Args;

rootdir (str): root dir where to start looking for files include_patterns (str or list) : list of glob pattern of files to return exclude_patterns (str or list) : list of glob pattern of files to exclude

(if a file is both in include and exclude, it is excluded)

include_dirs (str or list) : list of glob directory patterns to return exclude_dirs (str or list) : list of glob directory patterns to exclude

(if a dir is both in include and exclude, it is excluded)

excludes_full (list) : list of exact (relative to package root) filepathes to exclude from manifest.

>>> for fn in find_all_files('c:\tmp','*.txt'):
        print(fn)
>>>
setuphelpers.find_processes(process_name)[source]

Return list of Process names process_name

Args:

process_name (str): process name to lookup

Returns:

list: list of processes (Process) named process_name or process_name.exe

>>> [p.pid for p in find_processes('explorer')]
[2756, 4024]
setuphelpers.get_code_name_version()[source]
setuphelpers.get_computer_groups()[source]

Try to find the computer in the Active Directory and return the list of groups

setuphelpers.get_computername()[source]

Return host name in lowercase (without domain part)

setuphelpers.get_current_user()[source]

Get the login name for the current user. >>> get_current_user() u’htouvet’

setuphelpers.get_debian_version()[source]
setuphelpers.get_default_gateways()[source]
setuphelpers.get_disk_free_space(filepath)[source]

Returns the number of free bytes on the drive that filepath is on

setuphelpers.get_distrib_linux()[source]
setuphelpers.get_distrib_version()[source]
setuphelpers.get_dns_servers()[source]
setuphelpers.get_domain_from_socket()[source]

Return main DNS domain of the computer

Returns:

str: domain name

>>> get_domain_from_socket()
u'tranquilit.local'
setuphelpers.get_domain_info(ldap_auth_server=None, use_ssl=True, force_tgt=True, hostname=None, domain=None, verify_cert_ldap=False)[source]

Return dict ad_site , ou and groups Warning : Please note that the search for gssapi does not work if the reverse dns recording is not available for ad

setuphelpers.get_file_properties(fname, ignore_warning=True)[source]

Read all properties of the given file return them as a dictionary.

Args:

fname : path to Windows executable or DLL

Returns:

dict: properties of executable

>>> xp = get_file_properties(r'c:\windows\explorer.exe')
>>> 'FileVersion' in xp and 'FileDescription' in xp
True
setuphelpers.get_fqdn()[source]
setuphelpers.get_groups(user)[source]
setuphelpers.get_host_architecture()[source]
setuphelpers.get_hostname()[source]
setuphelpers.get_kernel_version()[source]
setuphelpers.get_language(full_locale=False, separator='_')[source]

Get the os default locale (example: fr, en, pl, etc.)

>>> get_language()
'fr'
>>> get_language(full_locale=True)
'fr_FR'
>>> get_language(full_locale=True, separator='-').lower()
'fr-fr'
setuphelpers.get_last_logged_on_user()[source]
setuphelpers.get_local_IPs()[source]
setuphelpers.get_loggedinusers()[source]
setuphelpers.get_main_ip(host=None, hostv6=None)[source]
setuphelpers.get_os_name()[source]

Get the name of the current running operating system

Returns:

str: Windows, Linux, Darwin

>>> get_os_name()
'Windows'
setuphelpers.get_os_version()[source]
setuphelpers.get_proxies()[source]

Return system proxy with the urllib python library

>>> get_proxies()
{'http': 'http://srvproxy.ad.domain.lan:8080',
'https': 'http://srvproxy.ad.domain.lan:8080'}
setuphelpers.get_sha256(afile='', BLOCK_SIZE=1048576)[source]
setuphelpers.host_info()[source]
setuphelpers.host_info_common_unix()[source]

Read main workstation informations, returned as a dict

Returns:

dict: main properties of host, networking and windows system

Changed in version 1.4.1: returned keys changed : dns_domain -> dnsdomain

>>> hi = host_info()
>>> 'computer_fqdn' in hi and 'connected_ips' in hi and 'computer_name' in hi and 'mac' in hi
True
setuphelpers.host_info_networking()[source]
setuphelpers.host_metrics()[source]

Frequently updated host data

setuphelpers.hours_minutes(hours)[source]
setuphelpers.httpdatetime2isodate(httpdate, localtime=False)[source]

Convert a date string as returned in http headers or mail headers to isodate (UTC)

>>> import requests
>>> last_modified = requests.head('http://wapt/wapt/Packages',headers={'cache-control':'no-cache','pragma':'no-cache'}).headers['last-modified']
>>> len(httpdatetime2isodate(last_modified)) == 19
True
setuphelpers.inifile_deleteoption(inifilename, section, key)[source]

Remove a key within the section of the inifile

Args:

inifilename (str): Path to the ini file section (str): section key (str): value key of option to remove

Returns:

boolean : True if the key/option has been removed

>>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.2')
>>> print inifile_hasoption('c:/tranquilit/wapt/tests/test.ini','global','version')
True
>>> print inifile_deleteoption('c:/tranquilit/wapt/tests/test.ini','global','version')
False
setuphelpers.inifile_deletesection(inifilename, section)[source]

Remove a section within the inifile

Args:

inifilename (str): Path to the ini file section (str): section to remove

Returns:

boolean : True if the section has been removed

setuphelpers.inifile_hasoption(inifilename, section, key)[source]

Check if an option is present in section of the inifile

Args:

inifilename (str): Path to the ini file section (str): section key (str): value key to check

Returns:

boolean : True if the key exists

>>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.2')
>>> print inifile_hasoption('c:/tranquilit/wapt/tests/test.ini','global','version')
True
>>> print inifile_hasoption('c:/tranquilit/wapt/tests/test.ini','global','dontexist')
False
setuphelpers.inifile_hassection(inifilename, section)[source]

Check if an option is present in section of the inifile

Args:

inifilename (str): Path to the ini file section (str): section

Returns:

boolean : True if the key exists

>>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.2')
>>> print inifile_hassection('c:/tranquilit/wapt/tests/test.ini','global')
True
setuphelpers.inifile_readstring(inifilename, section, key, default=None)[source]

Read a string parameter from inifile

>>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.2')
>>> print inifile_readstring('c:/tranquilit/wapt/tests/test.ini','global','version')
1.1.2
>>> print inifile_readstring('c:/tranquilit/wapt/tests/test.ini','global','undefaut','defvalue')
defvalue
setuphelpers.inifile_writestring(inifilename, section, key, value)[source]

Write a string parameter to inifile

>>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.1')
>>> print inifile_readstring('c:/tranquilit/wapt/tests/test.ini','global','version')
1.1.1
setuphelpers.install_apt(package, allow_unauthenticated=False)[source]

Install package from APT repositories

setuphelpers.install_deb(path_to_deb)[source]

Install .deb package from file

setuphelpers.install_package_if_needed(package_name)[source]
setuphelpers.install_required_dependencies_apt()[source]
setuphelpers.install_rpm(package)[source]
setuphelpers.install_yum(package)[source]
setuphelpers.installed_softwares(keywords=None, name=None, ignore_empty_names=True)[source]

Return list of installed software from apt or rpm

Args:

keywords (str or list): string to lookup in key, display_name or publisher fields

Returns:
list of dicts: [{‘key’, ‘name’, ‘version’, ‘install_date’, ‘install_location’

‘uninstall_string’, ‘publisher’,’system_component’}]

setuphelpers.is32()[source]
setuphelpers.is64()[source]
setuphelpers.isARM()[source]
setuphelpers.isARM64()[source]
setuphelpers.isLinux64()
setuphelpers.is_debian()[source]
setuphelpers.is_debian_based()
setuphelpers.is_linux64()[source]
setuphelpers.is_redhat_based()
setuphelpers.is_rhel_based()
setuphelpers.is_valid_ipv4_address(address)[source]
setuphelpers.isdir(s)[source]

Return true if the pathname refers to an existing directory.

setuphelpers.isfile(path)[source]

Test whether a path is a regular file

setuphelpers.isodate2datetime(isodatestr)[source]
setuphelpers.isrunning(processname)[source]

Check if a process is running,

>>> isrunning('explorer')
True
setuphelpers.json_load_file(json_file, encoding='utf-8')[source]

Get content of a json file

Args:

path (str): path to the file

Returns:

dictionary (dict)

setuphelpers.json_write_file(json_file, data, indent=4, sort_keys=False, encoding='utf-8')[source]

Write dictionary to a json file

Args:

json_file (str): path to json file data (dict): dictionary content indent (str or list of str): tabulation size ; default: 4 spaces sort_keys (bool): alphabetical order or not encoding (bool): file encoding

setuphelpers.killalltasks(process_names, include_children=True)[source]

Kill the task by their process_names

>>> killalltasks('firefox')
setuphelpers.killtree(pid, including_parent=True)[source]
setuphelpers.listening_sockets(low_ports=False, include_loc=True, kind='all')[source]
setuphelpers.local_drives()[source]
setuphelpers.local_group_members(groupname)[source]
setuphelpers.local_group_memberships(username)[source]

List the local groups a user is member Of

setuphelpers.local_groups()[source]
setuphelpers.makepath(*p)[source]

Create a path given the components passed, but with saner defaults than os.path.join - In particular, removes ending path separators (backslashes) from components. Path functions will be called automatically

>>> makepath("c:", "Windows", "system32")
'c:\\Windows\\system32'
>>> makepath(system32())
'C:\\WINDOWS\\system32'
>>> system32()
'C:\\WINDOWS\\system32'
>>> system32
<function system32 at 0x063EBE79>
>>> makepath(system32)
'C:\\WINDOWS\\system32'
setuphelpers.mkdirs(path)[source]

Create directory path if it doesn’t exists yet Creates intermediate directories too.

>>> mkdirs("C:\Program Files (x86)\wapt")
u'C:\Program Files (x86)\wapt'
setuphelpers.networking()[source]

return a list of (iface,mac,{addr,broadcast,netmask})

setuphelpers.processes_for_file(filepath, open_files=True, dll=True)[source]

Generator returning processes currently having a open file descriptor for filepath

If not running as System account, can not access system processes.

Args:

filepath (str): file path or pattern (glob *)

Returns:

iterator psutil.Process

setuphelpers.purge_deb(deb_name)[source]
setuphelpers.remove_file(path)[source]

Try to remove a single file or symlink log a warning msg if file doesn’t exist log a critical msg if file can’t be removed

Args:

path (str): path to file

>>> remove_file(r'c:\tmp\fc.txt')
setuphelpers.remove_tree(*args, **kwargs)[source]

Convenience function to delete a directory tree, with any error not ignored by default. Pass ignore_errors=False to access possible errors.

Args:

path (str): path to directory to remove ignore_errors (boolean) : default to False. Set it to True to ignore exceptions on children deletion onerror (func) : hook called with (func, path, exc)

on each delete exception. Should raise if stop is required.

setuphelpers.rsa_decrypt()

data, rsakeypem, password:bytes -> bytes return decrypted data

setuphelpers.rsa_encrypt()

data, x509certPEM: bytes -> bytes. returns encrypted data with public key in X509 certificate

setuphelpers.rsa_encrypt_data(txt, list_crt)[source]
setuphelpers.rsa_encrypted_data_str(txt, list_crt)[source]
setuphelpers.run(cmd, shell=True, timeout=600, accept_returncodes=[0, 3010], on_write=None, pidlist=None, return_stderr=True, **kwargs)[source]

Run the command cmd in a shell and return the output and error text as string

Args:

cmd : command and arguments, either as a string or as a list of arguments shell (boolean) : True is assumed timeout (int) : maximum time to wait for cmd completion is second (default = 600)

a TimeoutExpired exception is raised if tiemout is reached.

on_writecallback when a new line is printed on stdout or stderr by the subprocess

func(unicode_line). arg is enforced to unicode

accept_returncodes (list) : list of return code which are considered OK default = (0, 3010) pidlist (list): external list where to append the pid of the launched process. return_stderr (bool or list) : if True, the error lines are returned to caller in result.

if a list is provided, the error lines are appended to this list

all other parameters from the psutil.Popen constructor are accepted

Returns:
RunOutputbytes like output of stdout and optionnaly stderr streams.

returncode attribute

Raises:

CalledProcessError: if return code of cmd is not in accept_returncodes list TimeoutExpired: if process is running for more than timeout time.

Changed in version 1.3.9: return_stderr parameters to disable stderr or get it in a separate list return value has a returncode attribute to

Changed in version 1.4.0: output is not forced to unicode

Changed in version 1.4.1: error code 1603 is no longer accepted by default.

Changed in version 1.5.1: If cmd is unicode, encode it to default filesystem encoding before running it.

>>> run(r'dir /B c:\windows\explorer.exe')
'explorer.exe\r\n'
>>> out = []
>>> pids = []
>>> def getlines(line):
...    out.append(line)
>>> run(r'dir /B c:\windows\explorer.exe',pidlist=pids,on_write=getlines)
u'explorer.exe\r\n'
>>> print out
['explorer.exe\r\n']
>>> try:
...     run(r'ping /t 127.0.0.1',timeout=3)
... except TimeoutExpired:
...     print('timeout')
timeout
setuphelpers.run_notfatal(*cmd, **args)[source]

Runs the command and wait for it termination, returns output Ignore exit status code of command, return ‘’ instead

Changed in version 1.4.0: output is now enforced to unicode

setuphelpers.running_on_ac()[source]
setuphelpers.service_list()[source]
setuphelpers.shell_launch(cmd)[source]

Launch a command (without arguments) but doesn’t wait for its termination

>>> open('c:/tmp/test.txt','w').write('Test line')
>>> shell_launch('c:/tmp/test.txt')
setuphelpers.systemd_daemon_reload()[source]
setuphelpers.systemd_disable_service(servicename)[source]
setuphelpers.systemd_enable_service(servicename)[source]
setuphelpers.systemd_enable_start_service(servicename)[source]
setuphelpers.systemd_restart_service(servicename)[source]
setuphelpers.systemd_start_service(servicename)[source]
setuphelpers.systemd_status_service(servicename)[source]
setuphelpers.systemd_stop_service(servicename)[source]
setuphelpers.time2display(adatetime)[source]
setuphelpers.type_debian()[source]
setuphelpers.type_redhat()[source]
setuphelpers.uninstall_apt(package, autoremove=False)[source]

Remove package with APT

setuphelpers.uninstall_yum(package)[source]
setuphelpers.unzip(zipfn, target=None, filenames=None, extract_with_full_paths=True)[source]

Unzip the files from zipfile with patterns in filenames to target directory

Args:

zipfn (str) : path to zipfile. (can be relative to temporary unzip location of package) target (str) : target location. Defaults to dirname(zipfile) + basename(zipfile) filenames (str or list of str): list of filenames / glob patterns (path sep is normally a slash) extract_with_full_paths (bool): keeping or not the subfolders of the archive as is

Returns:

list : list of extracted files

>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt')
['C:\\example\\tis-7zip_9.2.0-15_all\\7z920-x64.msi',
 'C:\\example\\tis-7zip_9.2.0-15_all\\7z920.msi',
 'C:\\example\\tis-7zip_9.2.0-15_all\\setup.py',
 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/control',
 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/wapt.psproj',
 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/manifest.sha256',
 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/signature']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', filenames=['*.msi','*.py'])
['C:\\example\\tis-7zip_9.2.0-15_all\\7z920-x64.msi',
 'C:\\example\\tis-7zip_9.2.0-15_all\\7z920.msi',
 'C:\\example\\tis-7zip_9.2.0-15_all\\setup.py']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', target='test', filenames=['*.msi','*.py'])
['C:\\example\\test\\7z920-x64.msi',
 'C:\\example\\test\\7z920.msi',
 'C:\\example\\test\\setup.py']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', filenames='WAPT/*')
['C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/control',
 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/wapt.psproj',
 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/manifest.sha256',
 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/signature']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', filenames='WAPT/control')
['C:\\example\\tis-7zip_9.2.0-15_all\\WAPT\\control']
>>> unzip('tis-7zip_9.2.0-15_all.wapt', target='.', filenames='WAPT/control')
['.\\WAPT\\control']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', target=r'C:\example\', filenames='WAPT/control')
['C:\\example\\WAPT\\control']
>>> unzip('tis-7zip_9.2.0-15_all.wapt', target=basedir, filenames='WAPT/control')
['C:\\example\\WAPT\\control']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', filenames='WAPT/control', extract_with_full_paths=False)
['C:\\example\\control']

New in version 1.3.11.

Changed in version 2.2: added extract_with_full_paths parameter

setuphelpers.update_apt()[source]
setuphelpers.update_yum()[source]
setuphelpers.upgrade_apt()[source]
setuphelpers.upgrade_yum()[source]
setuphelpers.user_appdata()

Return the local appdata profile of current user

Returns:

str: path like u’/home/user/.config/’

setuphelpers.user_home_directory()[source]
setuphelpers.user_local_appdata()[source]

Return the local appdata profile of current user

Returns:

str: path like u’/home/user/.config/’

setuphelpers.wget(url, target=None, printhook=None, proxies=None, connect_timeout=10, download_timeout=None, verify_cert=None, referer=None, user_agent=None, cert=None, resume=False, md5=None, sha1=None, sha256=None, cache_dir=None, requests_session=None, limit_bandwidth=None)[source]

Copy the contents of a file from a given URL to a local file.

Args:

url (str): URL to document target (str) : full file path of downloaded file. If None, put in a temporary dir with supplied url filename (final part of url) proxies (dict) : proxies to use. eg {‘http’:’http://wpad:3128’,’https’:’http://wpad:3128’} timeout (int) : seconds to wait for answer before giving up auth (list) : (user,password) to authenticate with basic auth verify_cert (bool or str) : either False, True (verify with embedded CA list), or path to a directory or PEM encoded CA bundle file

to check https certificate signature against.

cert (list) : tuple/list of (x509certfilename,pemkeyfilename,key password) for authenticating the client. If key is not encrypted, password must be None referer (str): user_agent: resume (bool): md5 (str) : sha1 (str) : sha256 (str) : cache_dir (str) : if file exists here, and md5 matches, copy from here instead of downloading. If not, put a copy of the file here after downloading.

requests_session (request.Session) : predefined request session to use instead of building one from scratch from proxies, cert, verfify_cert

Returns:

str : path to downloaded file

>>> respath = wget('http://wapt.tranquil.it/wapt/tis-firefox_28.0.0-1_all.wapt','c:\\tmp\\test.wapt',proxies={'http':'http://proxy:3128'})
???
>>> os.stat(respath).st_size>10000
True
>>> respath = wget('http://localhost:8088/runstatus','c:\\tmp\\test.json')
???
setuphelpers.wgets(url, proxies: Optional[dict] = None, verify_cert=None, referer=None, user_agent=None, timeout=None, cert=None, requests_session=None, as_json=False) str[source]

Return the content of a remote resource as a string / bytes or dict with a http get request.

Raise an exception if remote data can’t be retrieved.

Args:

url (str): http(s) url proxies (dict): proxy configuration as requests requires it {‘http’: url, ‘https’:url} verify_cert (bool or str) : verfiy server certificate, path to trusted CA bundle cert (tuple of 3 str) : (cert_path, key_path, key password) client side authentication.

requests_session (request.Session) : predefined request session to use instead of building one from scratch

Returns:

str or bytes or dict : content of remote resource. str or bytes or json depending of the encoding and the Content-Type.

>>> data = wgets('https://wapt/ping')
>>> "msg" in data
True

PackageEntry Class

Version Class

class setuphelpers.Version(version, members_count=None)[source]

Version object of form 0.0.0 can compare with respect to natural numbering and not alphabetical

Args:

version (str) : version string member_count (int) : number of version memebers to take in account.

If actual members in version is less, add missing memeber with 0 value If actual members count is higher, removes last ones.

>>> Version('0.10.2') > Version('0.2.5')
True
>>> Version('0.1.2') < Version('0.2.5')
True
>>> Version('0.1.2') == Version('0.1.2')
True
>>> Version('7') < Version('7.1')
True

Changed in version 1.6.2.5: truncate version members list to members_count if provided.

sortable_str()[source]

Output a str suitable for direct ordering members are converted to a chars hex padded with zero on the left. If member is not a digit, it is padded to a 8 chars string padded with spaces on the right.

Setupdevhelpers

setupdevhelpers.add_double_quotes_around(string)[source]

Return the string with double quotes around

Args:

string (str): a string

setupdevhelpers.bs_find(url, element, attribute=None, value=None, user_agent=None, proxies=None, features='html.parser', **kwargs)[source]

Parse html web page with BeautifulSoup and get the first result

Args:

url (str): url of the web page to parse element (str): searched element attribute (str): selected attribute of the element value (str): value of the selected attribute user_agent (str): specify a user-agent if needed proxies (dict): specify your proxy if needed **kwargs (str): joker for requests parameters features (str): bs feature to use

>>> bs_find('https://www.w3.org/', 'a', 'title', 'Open Web Platform testing')['href']
'https://web-platform-tests.org/'
>>> bs_find('https://www.w3.org/', 'span', 'class', 'alt-logo').string
'W3C'

New in version 2.0.

setupdevhelpers.bs_find_all(url, element, attribute=None, value=None, user_agent=None, proxies=None, features='html.parser', **kwargs)[source]

Parse html web page with BeautifulSoup and get a list of the result

Args:

url (str): url of the web page to parse element (str): searched element attribute (str): selected attribute of the element value (str): value of the selected attribute user_agent (str): specify a user-agent if needed proxies (dict): specify your proxy if needed **kwargs (str): joker for requests parameters features (str): bs feature to use

>>> bs_find_all('https://www.w3.org/', 'a', 'title', 'Open Web Platform testing')[0]['href']
'https://web-platform-tests.org/'
>>> bs_find_all('https://www.w3.org/', 'span', 'class', 'alt-logo')[0].string
'W3C'

New in version 2.0.

setupdevhelpers.get_proxies_from_wapt_console()[source]

Return proxy information from the current user WAPT console

>>> get_proxies_from_wapt_console()
{'http': 'http://srvproxy.ad.domain.lan:8080',
'https': 'http://srvproxy.ad.domain.lan:8080'}
setupdevhelpers.remove_outdated_binaries(version, list_extensions=['exe', 'msi', 'deb', 'rpm', 'dmg', 'pkg'], filename_contains=None)[source]

Remove files based on the version contained in his filename, failing over on file version on compatible OSes

Args:

version (str): version number of keeped files list_extensions (str or list of str): file extensions of compared files filename_contains (str or list of str): Part of the filename that must be contained (useful for distinguishing architecture and os)

Returns:

list: list of deleted files

New in version 2.0.

Changed in version 2.2: Now returns removed files, now checking .exe and .msi file versions

setupdevhelpers.unzip_with_7zip(filename, target=None, filenames=[], extract_with_full_paths=True, recursive=True)[source]

Extract the files from an archive file with 7zip with patterns in filenames to target directory

Args:

filename (str): path to archive file. (can be relative to temporary unzip location of package) target (str): archive content to target dir location (dir will be created if needed). Default: dirname(archive file) + basename(archive file) filenames (str or list of str): list of filenames / glob patterns (path sep is normally a slash) extract_with_full_paths (bool): keeping or not the subfolders of the archive as is recursive (bool): looking or not for filenames recursively

Returns:

None

New in version 2.0.

Changed in version 2.2: changed default target location to make it correspond with unzip()

Waptpackage

exception waptpackage.EWaptBadControl[source]

Bases: EWaptException

exception waptpackage.EWaptBadPackageAttribute[source]

Bases: EWaptException

exception waptpackage.EWaptBadSetup[source]

Bases: EWaptException

exception waptpackage.EWaptBadSignature[source]

Bases: EWaptException

exception waptpackage.EWaptConfigurationError[source]

Bases: EWaptException

exception waptpackage.EWaptConflictingPackage(msg, install_status='ERROR', retry_count=None)[source]

Bases: EWaptInstallError

exception waptpackage.EWaptCorruptedFiles[source]

Bases: EWaptException

exception waptpackage.EWaptDiskSpace[source]

Bases: EWaptException

exception waptpackage.EWaptDownloadError[source]

Bases: EWaptException

exception waptpackage.EWaptException[source]

Bases: Exception

exception waptpackage.EWaptInstallError(msg, install_status='ERROR', retry_count=None)[source]

Bases: EWaptException

Exception raised during installation of package msg is logged in local install database if retry_count is None, install will be retried indefinitely until success else install is retried at most retry_count times.

exception waptpackage.EWaptInstallPostponed(msg, install_status='POSTPONED', retry_count=5, grace_delay=3600)[source]

Bases: EWaptInstallError

exception waptpackage.EWaptMissingLocalWaptFile[source]

Bases: EWaptException

exception waptpackage.EWaptNeedsNewerAgent[source]

Bases: EWaptException

exception waptpackage.EWaptNotAPackage[source]

Bases: EWaptException

exception waptpackage.EWaptNotSigned[source]

Bases: EWaptException

exception waptpackage.EWaptNotSourcesDirPackage[source]

Bases: EWaptException

exception waptpackage.EWaptPackageSignError[source]

Bases: EWaptException

exception waptpackage.EWaptUnavailablePackage(msg, install_status='ERROR', retry_count=None)[source]

Bases: EWaptInstallError

class waptpackage.HostCapabilities(from_dict=None, **kwargs)[source]

Bases: BaseObjectClass

as_dict()[source]
fingerprint()[source]
get_package_request_filter()[source]

Returns a filter for package search in repositories

Returns:

PackageRequest

has_matching_target(package_entry)[source]
is_matching_package(package_entry, for_datetime=None)[source]

Check if package_entry is matching the current capabilities and restrictions

class waptpackage.PackageEntry(package='', version='0', repo='', waptfile=None, section='base', **kwargs)[source]

Bases: BaseObjectClass

Manage package attributes coming from either control files in WAPT package, local DB, or developement dir.

Methods to build, unzip, sign or check a package. Methods to sign the control attributes and check them.

>>> pe = PackageEntry('testgroup','0')
>>> pe.depends = 'tis-7zip'
>>> pe.section = 'group'
>>> pe.description = 'A test package'
>>> print(pe.ascontrol())
package           : testgroup
version           : 0
architecture      : all
section           : group
priority          : optional
maintainer        :
description       : A test package
depends           : tis-7zip
conflicts         :
maturity          :
locale            :
min_wapt_version  :
sources           :
installed_size    :
signer            :
signer_fingerprint:
signature_date    :
signed_attributes :
>>>
add_conflicts(to_add)[source]
add_depends(to_add)[source]
property all_attributes
as_control_bytes()[source]

Return this package entry metadata as bytes as saved in package control file

Return:

bytes: lines with key: value

as_dict()[source]
as_key()[source]
as_package_request()[source]
as_zipfile(mode='r')[source]

Return a CustomZipFile for this package for read only operations

ascontrol(with_repo_attributes=False, with_empty_attributes=False)[source]

Return control attributes and values as stored in control packages file

Each attribute on a line with key : value If value is multiline, new line begin with a space.

Args:

with_repo_attributes (bool) : if True, include md5sum and filename (for Packages index only) with_empty_attributes (bool) : weither to include attribute with empty value too or only

non empty and/or signed attributes

Returns:

str: lines of attr: value

asrequirement()[source]

Return package and version for designing this package in depends or install actions

Returns:

str: “packagename (=version)”

build_management_package(target_directory=None)[source]

Build the WAPT package from attributes only, without setup.py stores the result in target_directory.

self.sourcespath must be None. Package will contain only a control file.

Args:
target_directory (str): where to create Zip wapt file.

if None, temp dir will be used.

Returns:

str: path to zipped Wapt file. It is unsigned.

>>> pe = PackageEntry('testgroup','0',description='Test package',maintainer='Hubert',sources='https://dev/svn/testgroup',architecture='x86')
>>> waptfn = pe.build_management_package()
>>> key = SSLPrivateKey('c:/private/htouvet.pem',password='monmotdepasse')
>>> crt = SSLCertificate('c:/private/htouvet.crt')
>>> pe.sign_package(crt,key)
>>> pe.unzip_package()
'c:\users\htouvet\appdata\local\temp\waptob4gcd'
>>> ca = SSLCABundle('c:/wapt/ssl')
>>> pe.check_control_signature(ca)
<SSLCertificate cn=u'htouvet' issuer=u'tranquilit-ca-test' validity=2017-06-28 - 2027-06-26 Code-Signing=True CA=True>
build_manifest(exclude_filenames=None, block_size=1048576, forbidden_files=[], waptzip=None, excludes=[])[source]

Calc the manifest of a wapt package

Args:
forbidden_files (list): list of relative files which must not be present for the manifest to be built

(if one is found, build fails)

exclude_filenames (list) : list of exact (relative to package root with forward slashes) filepathes to exclude from manifest. excludes (list) : list of file / dir patterns to exclude, whatever level they are in the file hierarchy

Returns:

dict: {filepath:shasum,}

build_package(excludes=[], target_directory=None, excludes_full=['.svn', '.git', 'setup.pyc', 'update_package.pyc', '__pycache__/setup.cpython-38.pyc', '__pycache__/update_package.cpython-38.pyc'])[source]

Build the WAPT package, stores the result in target_directory Zip the content of self.sourcespath directory into a zipfile named with default package filename based on control attributes.

Update filename attribute. Update localpath attribute with result filepath.

Args:

excludes (list) : list of patterns for source files to exclude from built package. target_directory (str): target directory where to store built package.

If None, use parent directory of package sources dircetory.

excludes_full (list) : list of exact (relative to package root) filepathes to exclude from Zip.

Returns:

str: full filepath to built wapt package

call_setup_hook(hook_name='session_setup', wapt_context=None, params=None, force=None, user=None)[source]

Calls a hook in setuppy given a wapt_context

Set basedir, control, and run context within the function context.

Args:

hook_name (str): name of function to call in setuppy wapt_context (Wapt) : run context

Returns:

output of hook.

Changes:

1.6.2.1: the called hook is run with Disabled win6432 FileSystem redirection

change_depends_conflicts_prefix(new_prefix)[source]

Change prefix of package name to new_prefix in depends and conflicts csv lists and return True if it was really changed.

Args:

new_prefix (str): new prefix to put in package names

Returns:

bool

change_prefix(new_prefix)[source]

Change prefix of package name to new_prefix and return True if it was really changed.

check_control_signature(trusted_bundle, signers_bundle=None)[source]

Check in memory control signature against a list of public certificates

Args:

trusted_bundle (SSLCABundle): Trusted certificates. : packages certificates must be signed by one of this bundle. signers_bundle : Optional. List of potential packages signers certificates chains.

When checking Packages index, actual packages are not available, only certificates embedded in Packages index. Package signature are checked against these certificates looking here for potential intermediate CA too. and matching certificate is checked against trusted_bundle.

Returns:

SSLCertificate : matching trusted package’s signers SSLCertificate

>>> from waptpackage import *
>>> from common import SSLPrivateKey,SSLCertificate
>>> k = SSLPrivateKey('c:/private/test.pem')
>>> c = SSLCertificate('c:/private/test.crt')
>>> p = PackageEntry('test',version='1.0-0')
>>> p.depends = 'test'
>>> p._sign_control(k,c)
>>> p.check_control_signature(c)
>>> p.check_control_signature(SSLCABundle('c:/wapt/ssl'))
check_package_attributes(remove_min_max_os=False)[source]
check_package_signature(trusted_bundle, ignore_missing_files=False)[source]

Check - hash of files in unzipped package_dir with list in package’s manifest file - try to decrypt manifest signature with package’s certificate - check that the package certificate is issued by a know CA or the same as one the authorized certitificates.

Args:

trusted_bundle (SSLCABundle) : Local certificates store. Certificates in trusted_bundle.trusted_certificates are trusted. ignore_missing_files (bool): whether to raise exception for missing files. Useful to check stripped down packages when remote resigning

Returns:

SSLCertificate : matching certificate

Raises:

Exception if no certificate match is found.

delete_localsources()[source]

Remove the unzipped local directory

fingerprint()[source]
get(name, default=None)[source]

Get PackageEntry property.

Args:

name (str): property to get. name is forced to lowercase. default (any) : value to return in case the property doesn’t not exist.

Returns:

any : property value

get_default_signed_attributes()[source]
get_impacted_process_list()[source]

Return a list containing the impacted process

Returns:

List[str] impacted process list

get_localized_description(locale_code=None)[source]

locale_code is a 2 chars code like fr or en or de

get_manifest_filename()[source]
get_signature_filename()[source]
get_software_version()[source]

Return the software version only (without the build number of the package)

Returns:

str: “software_version”

get_stripped_package()[source]

Build a package keeping only Wapt stuff… Keeps only WAPT directory and setup.py

Returns:

bytes: zipped data

get_values_for_db(locale_code)[source]
has_file(fname)[source]

Return None if fname is not in package, else return file datetime

Args:

fname (unicode): file path like WAPT/signature

Returns:

datetime : last modification datetime of file in Wapt archive if zipped or local sources if unzipped

has_setup_py()[source]
inc_build()[source]

Increment last number part of version in memory

invalidate_signature()[source]

Remove all signature informations from control and unzipped package directory Package must be in unzipped state.

list_corrupted_files(ignore_missing_files=False, remove_extra_files=False)[source]

Check hexdigest sha for the files in manifest. Package must be already unzipped.

Returns:

list: non matching files (corrupted files)

load_control_from_dict(adict)[source]

Fill in members of entry with keys from supplied dict

adict members which are not a registered control attribute are set too and attribute name is put in list of “calculated” attributes.

Args:

adict (dict): key,value to put in this entry

Returns:

PackageEntry: self

load_control_from_wapt(fname=None, calc_md5=False, keep_control_lines=False)[source]

Load package attributes from the control file (utf8 encoded) included in WAPT zipfile fname

Args:
fname (str or unicode): Package file/directory path

If None, try to load entry attributes from self.sourcespath or self.localpath If fname is a file path, it must be Wapt zipped file, and try to load control data from it If fname is a directory path, it must be root dir of unzipped package file and load control from <fname>/WAPT/control

calc_md5 (boolean): if True and fname is a zipped file, initialize md5sum attribute with md5 part of filename or calc from Zipped file

Returns:

PackageEntry: self

local_attributes = ['sourcespath', 'repo', 'localpath', 'repo_url']
make_fallback_uuid()[source]
make_package_edit_directory()[source]

Return the standard package directory to edit the package based on current attributes

Returns:
str: standard package filename
  • {package}_{version}_{architecture}_{OS}_{Min-OS-Version}_{Max-OS-Version}_{maturity}_{locale}-wapt

make_package_filename(with_md5sum=False)[source]

Return the standard package filename based on current attributes parts of control which are either ‘all’ or empty are not included in filename

Returns:
str: standard package filename
  • packagename.wapt for host

  • packagename_arch_maturity_locale.wapt for group

  • packagename_version_arch_maturity_locale.wapt for others

make_uuid()[source]
manifest_filename_excludes = ['WAPT/signature', 'WAPT/signature.sha256', 'WAPT/manifest.sha256']
match(match_expr)[source]

Return True if package entry match a package string like ‘tis-package (>=1.0.1-00)

Check if entry match search words

Args:

search (str): words to search for separated by spaces

Returns:

boolean: True if entry contains the words in search in correct order and at word boundaries

match_version(version_expr)[source]

Return True if package entry match a version string condition like ‘>=1.0.1-00’

merge_stripped_package(stripped_package_zip_data=None, stripped_package_filename=None)[source]

Use the files from stripped_package_zip_data and include it in current unzipped package, remove files not in manifest, and recheck file hashes.

not_duplicated_attributes = ['signer', 'signer_fingerprint', 'signature', 'signature_date', 'signed_attributes']
optional_attributes = ['name', 'categories', 'maintainer', 'description', 'depends', 'conflicts', 'maturity', 'locale', 'target_os', 'min_wapt_version', 'sources', 'installed_size', 'impacted_process', 'description_fr', 'description_pl', 'description_de', 'description_es', 'description_pt', 'description_it', 'description_nl', 'description_ru', 'audit_schedule', 'editor', 'keywords', 'licence', 'homepage', 'package_uuid', 'valid_from', 'valid_until', 'forced_install_on', 'changelog', 'min_os_version', 'max_os_version', 'icon_sha256sum']
package_certificates()[source]

Return certificates from package. If package is built, take it from Zip else take the certificates from unzipped directory

Returns:
list: list of embedded certificates when package was signed or None if not provided or signed.

First one of the list is the signer, the others are optional intermediate CA

package_ident()[source]

Version independent package key

Returns:

tuple

parse_capabilities_target_os(check=False)[source]
parse_version()[source]

Parse version to major, minor, patch, pre-release, build parts.

remove_conflicts(to_remove)[source]
remove_depends(to_remove)[source]
repo_attributes = ['filename', 'size', 'md5sum']
required_attributes = ['package', 'version', 'architecture', 'section', 'priority']
save_control_to_wapt(fname=None, force=True)[source]

Save package attributes to the control file (utf8 encoded)

Update self.locapath or self.sourcespath if not already set.

Args:
fname (str)base directoy of waptpackage or filepath of Zipped Packges.

If None, use self.sourcespath if exists, or self.localpath if exists

force (bool) : write control in wapt zip file even if it already exist

Returns:

PackageEntry : None if nothing written, or previous PackageEntry if new data written

Raises:

Exception: if fname is None and no sourcespath and no localpath Exception: if control exists and force is False

set_icon_sha256sum()[source]
set_software_version(version, inc_build=False)[source]

Set the software version only inc_build will increment the buildnumber

sign_package(certificate, private_key, keep_signature_date=False, excludes_full=['.svn', '.git', 'setup.pyc', 'update_package.pyc', '__pycache__/setup.cpython-38.pyc', '__pycache__/update_package.cpython-38.pyc'], excludes=[])[source]

Sign a package source directory or an already built (zipped) package. Should follow immediately the build_package step.

Append signed control, manifest.sha256 and signature to zip wapt package If these files are already in the package, they are first removed.

Use the self.localpath attribute to get location of waptfile build file.

Args:

certificate (SSLCertificate or list): signer certificate chain private_key (SSLPrivateKey): signer private key keep_signature_date (bool): If true, previous date fo signature is kept (useful when resigning is needed, but no meaningful change has been done) excludes_full (list) : list of exact (relative to package root) filepathes to exclude from manifest. excludes (list) : list of file / dir patterns to exclude, whatever level they are in the file hierarchy

Returns:

str: signature

sign_stripped_package(certificate, private_key, excludes_full=['.svn', '.git', 'setup.pyc', 'update_package.pyc', '__pycache__/setup.cpython-38.pyc', '__pycache__/update_package.cpython-38.pyc'], excludes=[], sign_setuppy=False)[source]

Sign an unzipped source package assuming digests in manifest file are OK except for control and certificate.crt * remove signature files -> WAPT/certificate.crt, WAPT/signature.sha256, * resign WAPT/control * update control hash in WAPT/manifest.sha256 * resign WAPT/manifest.sha256 and put in WAPT/signature.sha256

Args:

certificate (list of SSLCertificate) : certificates chain of the signer. First certificate is the signer’s one. other are intermediate CA private_key (SSLPrivateKey) : key to sign the control and manifest files. excludes_full excludes

Returns:

str: signature of manifest.sha256

signature_attributes = ['signer', 'signer_fingerprint', 'signature', 'signature_date', 'signed_attributes']
sortable_version()[source]
property tags
property target_os_list
unzip_package(target_dir=None, cabundle=None, ignore_missing_files=False)[source]

Unzip package and optionnally check content

Args:

target_dir (str): where to unzip package content. If None, a temp dir is created cabundle (list) : list of Certificates to check content. If None, no check is done

Returns:

str : path to unzipped packages files

Raises:

EWaptNotAPackage, EWaptBadSignature,EWaptCorruptedFiles if check is not successful, unzipped files are deleted.

class waptpackage.PackageRequest(request=None, copy_from=None, **kwargs)[source]

Bases: BaseObjectClass

Package and version request / condition The request is the basic packagename(=version) request Additional filters can be sepcified as arguments The list filters are ordered from most preferred to least preferred options

Args:

request (str): packagename(<=>version) architectures (list) : list of x64, x86, arm, arm64, armhf locales (list) : list of 2 letters lki

property architectures

List of accepted architecturs

as_dict()[source]
fingerprint()[source]
get_package_compare_key(pe1: PackageEntry) Tuple[source]

Compute a key for a package to compare it with other in the context of this request. This takes in account the preferences from filter like order of locale, architecture, or maturities which define preferences.

Args:

pe1 (PackageEntry)

Returns:

tuple

is_matched_by(package_entry: PackageEntry)[source]

Check if package_entry is matching this request

property locales: List
property maturities: List

List of accepted maturities

property max_os_version
property min_os_version
property package
property request
property tags: list

List of accepted tags for OS

property version
waptpackage.PackageVersion(package_or_versionstr) Tuple[source]

Splits a version string 1.2.3.4-567 software version is clipped to 4 members if ‘-packaging’ is not provided, the second member will be 0 for safe comparison

Args:

package_or_versionstr (str): package version string

Returns:

tuple: (Version,int) : soft version on 4 members / packaging as an int

waptpackage.PackageVersionStr(package_or_versionstr) Tuple[source]

Splits a version string 1.2.3.4-567 software version is clipped to 4 members if ‘-packaging’ is not provided, the second member will be 0 for safe comparison

Args:

package_or_versionstr (str): package version string

Returns:

tuple: (Version,int) : soft version on 4 members / packaging as an int

class waptpackage.WaptBaseRepo(name='abstract', cabundle=None, config=None, section=None)[source]

Bases: BaseObjectClass

Base abstract class for a Wapt Packages repository

as_dict() dict[source]
property authorized_certificates: Sequence[dict]

List of authorized signers certificates attributes

Returns:

list [dict]

property cabundle
config_fingerprint()[source]
get(packagename, default=None)[source]
get_certificates(packages_zipfile=None)[source]

Download signers certificates and crl from Package index on remote repository.

These certificates and CRL are appended to Packages index when scanning packages.

Args:

packages_zipfile (zipfile): if None, donwload it from repo

Returns :

SSLCABundle or None if Packages does not exists

get_package_by_uuid(package_uuid: str) PackageEntry[source]
get_package_entries(packages_names)[source]

Return most up to date packages entries for packages_names packages_names is either a list or a string Returns:

dict: a dictionnary with {‘packages’:[],’missing’:[]}

>>> r = WaptRemoteRepo()
>>> r.load_config_from_file('c:/wapt/wapt-get.ini')
>>> res = r.get_package_entries(['tis-firefox','tis-putty'])
>>> isinstance(res['missing'],list) and isinstance(res['packages'][0],PackageEntry)
True
invalidate_packages_cache()[source]

Reset in memory packages index Returns the old content of cached (packages, packages index date, discarded packages)

Returns:

dict : old cache status dict(_packages=self._packages,_packages_date=self._packages_date,discarded=self.discarded)

is_available()[source]

Return isodate of last updates of the repo is available else None

is_locally_allowed_package(package)[source]

Return True if package is not in blacklist and is in whitelist if whitelist is not None packages_whitelist and packages_blacklist are list of package name wildcards (file style wildcards) blacklist is taken in account first if defined. whitelist is taken in acoount if not None, else all not blacklisted package names are allowed.

load_config(config=None, section=None)[source]

Load configuration from inifile section. Use name of repo as section name if section is not provided. Use ‘global’ if no section named section in ini file Value not defined in ini file are taken from class _default_config dict

load_config is called at __init__, eventually with config = None. In this case, all parameters are initialized from defaults

Args:

config (RawConfigParser): ini configuration section (str) : section where to loads parameters

defaults to name of repository

Returns:

self: return itself to chain calls.

load_config_from_file(config_filename, section=None)[source]

Load repository configuration from an inifile located at config_filename

Args:

config_filename (str) : path to wapt inifile section (str): ini section from which to get parameters. default to repo name

Returns:

WaptBaseRepo: self

need_update(last_modified=None)[source]

Check if packages index has changed on repo and local index needs an update

Compare date on local package index DB with the Packages file on remote

repository with a HEAD http request.

Args:

last_modified (str): iso datetime of last known update of packages.

Returns
bool: True if either Packages was never read or remote date of Packages is

more recent than the provided last_modifed date.

>>> repo = WaptRemoteRepo(name='main',url='http://wapt/wapt',timeout=4)
>>> waptdb = WaptDB('c:/wapt/db/waptdb.sqlite')
>>> res = repo.need_update(waptdb.read_param('last-%s'% repo.url))
>>> isinstance(res,bool)
True
packages()[source]

Return list of packages, load it from repository if not yet available in memory To force the reload, call invalidate_index_cache() first or update()

packages_date()[source]

Date of last known packages index

Returns:

str: date/time of Packages index in iso format (string)

packages_matching(package_cond: Optional[Union[str, PackageRequest]] = None, **kwargs) list[source]

Return an ordered list of available packages entries which match the condition “packagename[([=<>]version)]?” version ascending

Args:

package_cond (str or PackageRequest): package name with optional version specifier.

Returns:

list of PackageEntry

>>> from waptpackage import *
>>> r = WaptRemoteRepo('http://wapt.tranquil.it/wapt')
>>> r.packages_matching('tis-firefox(>=20)')
[PackageEntry('tis-firefox','20.0.1-02'),
 PackageEntry('tis-firefox','21.0.0-00'),
 ...]
property public_certs_dir
property repo_url
search(searchwords=[], sections=[], newest_only=False, exclude_sections=[], description_locale=None, host_capabilities=None, package_request=None, dependencies_list=[])[source]
Return list of package entries

with description or name matching all the searchwords and section in provided sections list

Args:

searchwords (list or csv) : list of word to lookup in description and package names sections (list or csv) : list of package sections to use when searching newest_only (bool) : returns only highest version of package exclude_sections (list or csv): list of package sections to exclude when searching description_locale (str): if not None, search in description using this locale host_capabilities (HostCapabilities or dict): restrict output to these capabilities (os version locales, arch etc..) package_request (PackageRequest or dict) : restrict output to these filters, and sort output based on them

Returns:

list of PackageEntry with additional _localized_description added if description_locale is provided

>>> r = WaptRemoteRepo(name='test',url='http://wapt.tranquil.it/wapt')
>>> r.search('test')
update()[source]

Update local index of packages from source index

Returns:

last packages update file date

class waptpackage.WaptLocalRepo(localpath=None, name='waptlocal', cabundle=None, config=None, section=None)[source]

Bases: WaptBaseRepo

Index of Wapt local repository.
Index of packages is located in a Packages zip file, having one

Packages file, containing the concatenated content of “control” files of the packages.

A blank line means new package.

>>> localrepo = WaptLocalRepo('c:/wapt/cache')
>>> localrepo.update()
as_dict()[source]
is_available(url=None)[source]

Check if repo is reachable an return creation date of Packages.

Returns:

str: Iso creation date of remote Package file as returned in http headers

load_config(config=None, section=None)[source]

Load waptrepo configuration from inifile section.

Use name of repo as section name if section is not provided. Use ‘global’ if no section named section in ini file

Args:

config (RawConfigParser): ini configuration section (str) : section where to loads parameters

defaults to name of repository

Returns:

WaptRemoteRepo: return itself to chain calls.

property packages_path
property repo_url
update_packages_index(force_all=False, proxies=None, canonical_filenames=False, include_host_packages=False, include_certificates=True, include_crls=True, extract_icons=True)[source]

Scan self.localpath directory for WAPT packages and build a Packages (utf8) zip file with control data and MD5 hash

Extract icons from packages (WAPT/icon.png) and stores them in <repo path>/icons/<package name>.png Extract certificate and add it to Packages zip file in ssl/<fingerprint.crt> Append CRL for certificates.

Returns:

dict : {‘processed’:processed,’kept’:kept,’errors’:errors,’packages_filename’:packages_fname}

class waptpackage.WaptRemoteRepo(url=None, name='', verify_cert=None, http_proxy=None, timeout=None, cabundle=None, config=None, section=None)[source]

Bases: WaptBaseRepo

Gives access to a remote http repository, with a zipped Packages packages index

>>> repo = WaptRemoteRepo(name='main',url='http://wapt/wapt',timeout=4)
>>> last_modified = repo.is_available()
>>> isinstance(last_modified,str)
True
as_dict()[source]

returns a dict representation of the repository configuration and parameters

client_auth()[source]

Return SSL trio filenames for client side SSL auth

Returns:

tuple: (cert filename,key filename,key pwd)

download_icons(package_requests, target_dir=None, usecache=True, printhook=None)[source]

Download a list of icons from packages (requests are of the form packagename (>version) ) returns a dict of {“downloaded,”skipped”,”errors”}

If package_requests is a list of PackageEntry, update localpath of entry to match downloaded file.

Args:

package_requests (list) : list of PackageEntry to download or list of package with optional version

Returns:

dict: ‘packages’, ‘downloaded’, ‘skipped’, ‘errors’

>>> repo = WaptRemoteRepo(url='http://wapt.tranquil.it/wapt')
>>> wapt.download_packages(['tis-firefox','tis-waptdev'],printhook=nullhook)
{'downloaded': [u'c:/wapt\\cache\\tis-firefox_37.0.2-9_all.wapt', u'c:/wapt\\cache\\tis-waptdev.wapt'], 'skipped': [], 'errors': []}
download_packages(package_requests, target_dir=None, usecache=True, printhook=None)[source]
Download a list of packages (requests are of the form packagename (>version) )

returns a dict of {“downloaded,”skipped”,”errors”}

If package_requests is a list of PackageEntry, update localpath of entry to match downloaded file.

Args:

package_requests (list) : list of PackageEntry to download or list of package with optional version

Returns:

dict: ‘packages’, ‘downloaded’, ‘skipped’, ‘errors’

>>> repo = WaptRemoteRepo(url='http://wapt.tranquil.it/wapt')
>>> wapt.download_packages(['tis-firefox','tis-waptdev'],printhook=nullhook)
{'downloaded': [u'c:/wapt\\cache\\tis-firefox_37.0.2-9_all.wapt', u'c:/wapt\\cache\\tis-waptdev.wapt'], 'skipped': [], 'errors': []}
get_private_key_password(location, identity)[source]
get_requests_session(url=None, http_proxy=None)[source]

Returns a requests session object with optional ssl client side auth and proxies

Returns:

requests.Session

is_available(url=None, http_proxy=None)[source]

Check if repo is reachable an return creation date of Packages.

Try to access the repo and return last modified date of repo index or None if not accessible

Returns:

str: Iso creation date of remote Package file as returned in http headers

>>> repo = WaptRemoteRepo(name='main',url='https://wapt/wapt',timeout=1)
>>> repo.is_available() <= datetime2isodate()
True
>>> repo = WaptRemoteRepo(name='main',url='https://badwapt/wapt',timeout=1)
>>> repo.is_available() is None
True
load_config(config=None, section=None)[source]

Load waptrepo configuration from inifile section.

Use name of repo as section name if section is not provided.

Use ‘global’ if no section named section in ini file

Args:

config (RawConfigParser): ini configuration section (str) : section where to loads parameters

defaults to name of repository

Returns:

WaptRemoteRepo: return itself for chain calls.

packages_url(url=None)[source]

return url of Packages index file

>>> repo = WaptRemoteRepo(name='main',url='http://wapt/wapt',timeout=4)
>>> repo.packages_url
'http://wapt/wapt/Packages'

hardcoded path to the Packages index.

property proxies

dict for http proxies url suitable for requests based on the http_proxy repo attribute

Returns:

dict: {‘http’:’http://proxy:port’,’https’:’http://proxy:port’}

property repo_url
waptpackage.control_to_dict(control, int_params=('size', 'installed_size'), out_excluded_control_keys=['filename', 'size', 'md5sum', 'repo_url', 'repo'], out_control_lines=None)[source]

Convert a control file like object key1: value1 key2: value2 … list of lines into a dict

Multilines strings begins with a space

Breaks when an empty line is reached (limit between 2 package in Packages indexes)

Args:

control (file,str or list): file like object to read control from (until an empty line is reached) int_params (list): attributes which must be converted to int out_control_lines (list): if not None, decoded raw control lines are appended to this list.

Returns:

dict

waptpackage.make_version(major_minor_patch_build)[source]
waptpackage.parse_major_minor_patch_build(version)[source]

Parse version to major, minor, patch, pre-release, build parts.

waptpackage.update_packages(adir, force=False, proxies=None, canonical_filenames=False)[source]

Helper function to update a local packages index

This function is used on repositories to rescan all packages and

update the Packages index.

>>> if os.path.isdir('c:\wapt\cache'):
...     repopath = 'c:\wapt\cache'
... else:
...     repopath = '/var/www/wapt'
>>> p = PackageEntry()
>>> p.package = 'test'
>>> p.version = '10'
>>> new_package_fn = os.path.join(repopath,p.make_package_filename())
>>> if os.path.isfile(new_package_fn):
...     os.unlink(new_package_fn)
>>> res = update_packages(repopath)
>>> os.path.isfile(res['packages_filename'])
True
>>> r = WaptLocalRepo(localpath=repopath)
>>> l1 = r.packages()
>>> res = r.update_packages_index()
>>> l2 = r.packages()
>>> [p for p in l2 if p not in l1]
["test (=10)"]

Common

exception common.EWaptBadServerAuthentication[source]

Bases: EWaptException

exception common.EWaptCancelled[source]

Bases: Exception

class common.Wapt(config_filename=None, defaults=None, disable_update_server_status=True, wapt_base_dir=None, dbpath=None)[source]

Bases: BaseObjectClass

Global WAPT engine

add_hosts_repo() WaptHostRepo[source]

Add an automatic host repository, remove existing WaptHostRepo last one before

add_pyscripter_project(target_directory)[source]

Add a pyscripter project file to package development directory.

Args:

target_directory (str): path to location where to create the wapt.psproj file.

Returns:

None

add_upgrade_shutdown_policy()[source]

Add a local shitdown policy to upgrade system

add_vscode_project(target_directory)[source]

Add .vscode folder with project files to the package development directory

Args:

target_directory (str): path to the package development directory.

Returns:

None

as_dict()[source]
audit(package, force=False, audited_by=None) str[source]

Run the audit hook for the installed package” Source setup.py from database, filename, or packageEntry Stores the result and log into “wapt_localstatus” table

Args:

package (PackageEntry or directory or package name or {package_uuid} ): package to audit

Returns:

str : iso datetime of this audit as stored in packages status table

audit_data_expired(section, key, value)[source]

Check if the latest value associated with section/key is expired

Returns:

bool: True is data exists and has expires or if data does not exists.

authorized_certificates()[source]

return a list of autorized package certificate issuers for this host check_certificates_validity enable date checking.

available_categories()[source]
build_package(directoryname, target_directory=None, excludes=[])[source]

Build the WAPT package from a directory

Args:

directoryname (str): source root directory of package to build inc_package_release (boolean): increment the version of package in control file. set_maturity (str): if not None, change package maturity to this. Can be something like DEV, PROD etc..

Returns:

str: Filename of built WAPT package

build_upload(sources_directories, private_key_passwd=None, wapt_server_user=None, wapt_server_passwd=None, inc_package_release=False, target_directory=None, set_maturity=None)[source]

Build a list of packages and upload the resulting packages to the main repository. if section of package is group or host, user specific wapt-host or wapt-group

Returns

list: list of filenames of built WAPT package

property cabundle
call_python_code(python_filename, func_name, package_entry=None, force=None, params=None, working_dir=None, import_modules=[])[source]

Calls a function in python_filename. Set basedir, control, and run context within the function context.

Args:

python_filename : python filename mith module to load. func_name (str): name of function to call in setuppy package_entry (PackageEntry): if not None, use it to set environment

Returns:

output of hook.

check_all_depends_conflicts()[source]

Check the whole dependencies/conflicts tree for installed packages

check_cancelled(msg='Task cancelled')[source]
check_depends(apackages, forceupgrade=False, force=False, assume_removed=[], package_request_filter=None)[source]

Given a list of packagename or requirement “name (=version)”, return a dictionnary of {‘additional’ ‘upgrade’ ‘install’ ‘skipped’ ‘unavailable’,’remove’} of [packagerequest,matching PackageEntry]

Args:

apackages (str or list): list of packages for which to check missing dependencies. forceupgrade (boolean): if True, check if the current installed packages is the latest available force (boolean): if True, install the latest version even if the package is already there and match the requirement assume_removed (list): list of packagename which are assumed to be absent even if they are actually installed to check the

consequences of removal of packages, implies force=True

package_request_filter (PackageRequest): additional filter to apply to packages to sort by locales/arch/mat preferences

if None, get active host filter

Returns:

dict : {‘additional’ ‘upgrade’ ‘install’ ‘skipped’ ‘unavailable’, ‘remove’} with list of [packagerequest,matching PackageEntry]

check_downloads(apackages=None, usecache=True)[source]

Return list of available package entries to match supplied packages requirements

Args:

apackages (list or str): list of packages usecache (bool) : returns only PackageEntry not yet in cache

Returns:

list: list of PackageEntry to download

check_install(apackages=None, force=True, forceupgrade=True)[source]

Return a list of actions required for install of apackages list of packages if apackages is None, check for all pending updates.

Args:

apackages (str or list): list of packages or None to check pending install/upgrades force (boolean): if True, already installed package listed in apackages

will be considered to be reinstalled

forceupgrade: if True, all dependencies are upgraded to latest version,

even if current version comply with depends requirements

Returns:
dict: with keys [‘skipped’, ‘additional’, ‘remove’, ‘upgrade’, ‘install’, ‘unavailable’] and list of

(package requirements, PackageEntry)

check_install_running(max_ttl=60)[source]

Check if an install is in progress, return list of pids of install in progress Kill old stucked wapt-get processes/children and update db status max_ttl is maximum age of wapt-get in minutes

check_remove(apackages)[source]

Return a list of additional package to remove if apackages are removed

Args:

apackages (str or list of req or PackageRequest): list of packages for which parent dependencies will be checked.

Returns:

list: list of PackageRequest with broken dependencies

cleanup(obsolete_only=False)[source]

Remove cached WAPT files from local disk

Args:
obsolete_only (boolean): If True, remove packages which are either no more available,

or installed at a equal or newer version

Returns:

list: list of filenames of removed packages

>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini')
>>> l = wapt.download_packages(wapt.check_downloads())
>>> res = wapt.cleanup(True)
cleanup_session_setup()[source]

Remove all current user session_setup informations for removed packages

create_or_update_host_certificate(force_recreate=False)[source]
Create a rsa key pair for the host and a x509 certiticate.

Location of key is <wapt_root>private Should be kept secret restricted access to system account and administrators only.

Args:

force_recreate (bool): recreate key pair even if already exists for this FQDN.

Returns:

str: x509 certificate of this host.

property dbpath
default_config_filename()[source]
delete_audit_data(section, key)[source]
delete_param(name)[source]

Remove a key from local db

dependencies(packagename, expand=False)[source]

Return all dependecies of a given package >>> w = Wapt(config_filename=’c:/wapt/wapt-get.ini’) >>> dep = w.dependencies(‘tis-waptdev’) >>> isinstance(dep,list) and isinstance(dep[0],PackageEntry) True

download_icons(package_requests, usecache=True, printhook=None)[source]

Download a list of package icons (requests are of the form packagename (>version) ) returns a dict of {“downloaded,”skipped”,”errors”}

Args:

package_requests (str or list): list of packages to prefetch usecache (boolean) : if True, don’t download package if already in cache printhook (func) : callback with signature report(received,total,speed,url) to display progress

Returns:

dict: with keys {“downloaded,”skipped”,”errors”,”packages”} and list of PackageEntry.

>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini')
>>> def nullhook(*args):
...     pass
>>> wapt.download_packages(['tis-firefox','tis-waptdev'],usecache=False,printhook=nullhook)
{'downloaded': [u'c:/wapt\\cache\\tis-firefox_37.0.2-9_all.wapt', u'c:/wapt\\cache\\tis-waptdev.wapt'], 'skipped': [], 'errors': []}
download_packages(package_requests, usecache=True, printhook=None)[source]

Download a list of packages (requests are of the form packagename(=version) ) If several packages are matching a request, the highest/latest only is kept.

Args:

package_requests (str or list): list of packages to prefetch usecache (boolean) : if True, don’t download package if already in cache printhook (func) : callback with signature report(received,total,speed,url) to display progress

Returns:

dict: with keys {“downloaded,”skipped”,”errors”,”packages”} and list of PackageEntry.

>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini')
>>> def nullhook(*args):
...     pass
>>> wapt.download_packages(['tis-firefox','tis-waptdev'],usecache=False,printhook=nullhook)
{'downloaded': [u'c:/wapt\\cache\\tis-firefox_37.0.2-9_all.wapt', u'c:/wapt\\cache\\tis-waptdev.wapt'], 'skipped': [], 'errors': []}
download_upgrades()[source]

Download packages that can be upgraded

duplicate_package(packagename, newname=None, newversion=None, newmaturity=None, target_directory=None, append_depends=None, remove_depends=None, append_conflicts=None, remove_conflicts=None, auto_inc_version=True, usecache=True, printhook=None, cabundle=None)[source]

Duplicate an existing package. Duplicate an existing package from declared repostory or file into targetdirectory with

optional newname and version.

Args:

packagename (str) : packagename to duplicate, or filepath to a local package or package development directory. newname (str): name of target package newversion (str): version of target package. if None, use source package version target_directory (str): path where to put development files. If None, use temporary. If empty, use default development dir append_depends (list): comma str or list of depends to append. remove_depends (list): comma str or list of depends to remove. auto_inc_version (bool): if version is less than existing package in repo, set version to repo version+1 usecache (bool): If True, allow to use cached package in local repo instead of downloading it. printhook (func): hook for download progress cabundle (SSLCABundle): list of authorized ca certificate (SSLPublicCertificate) to check authenticity of source packages. If None, no check is performed.

Returns:

PackageEntry : new packageEntry with sourcespath = target_directory

>>> wapt = Wapt(config_filename='c:/tranquilit/wapt/tests/wapt-get.ini')
>>> wapt.dbpath = ':memory:'
>>> r= wapt.update()
>>> def nullhook(*args):
...     pass
>>> tmpdir = 'c:/tmp/testdup-wapt'
>>> if os.path.isdir(tmpdir):
...     import shutil
...     shutil.rmtree(tmpdir)
>>> p = wapt.duplicate_package('tis-wapttest',
...     newname='testdup',
...     newversion='20.0-0',
...     target_directory=tmpdir,
...     excludes=['.svn','.git','.gitignore','*.pyc','src'],
...     append_depends=None,
...     auto_inc_version=True,
...     usecache=False,
...     printhook=nullhook)
>>> print repr(p['package'])
PackageEntry('testdup','20.0-0')
>>> if os.path.isdir(tmpdir):
...     import shutil
...     shutil.rmtree(tmpdir)
>>> p = wapt.duplicate_package('tis-wapttest',
...    target_directory=tempfile.mkdtemp('wapt'),
...    auto_inc_version=True,
...    append_depends=['tis-firefox','tis-irfanview'],
...    remove_depends=['tis-wapttestsub'],
...    )
>>> print repr(p['package'])
PackageEntry('tis-wapttest','120')
edit_host(hostname, target_directory=None, append_depends=None, remove_depends=None, append_conflicts=None, remove_conflicts=None, printhook=None, description=None, cabundle=None)[source]

Download and extract a host package from host repositories into target_directory for modification

Args:

hostname (str) : fqdn of the host to edit target_directory (str) : where to place the developments files. if empty, use default one from wapt-get.ini configuration append_depends (str or list) : list or comma separated list of package requirements remove_depends (str or list) : list or comma separated list of package requirements to remove cabundle (SSLCA Bundle) : authorized ca certificates. If None, use default from current wapt.

Returns:

PackageEntry

>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini')
>>> tmpdir = 'c:/tmp/dummy'
>>> wapt.edit_host('dummy.tranquilit.local',target_directory=tmpdir,append_depends='tis-firefox')
>>> import shutil
>>> shutil.rmtree(tmpdir)
>>> host = wapt.edit_host('htlaptop.tranquilit.local',target_directory=tmpdir,append_depends='tis-firefox')
>>> 'package' in host
True
>>> shutil.rmtree(tmpdir)
edit_package(packagerequest, target_directory='', use_local_sources=True, append_depends=None, remove_depends=None, append_conflicts=None, remove_conflicts=None, auto_inc_version=True, cabundle=None)[source]

Download an existing package from repositories into target_directory for modification if use_local_sources is True and no newer package exists on repos, updates current local edited data else if target_directory exists and is not empty, raise an exception

Args:

packagerequest (str) : path to existing wapt file, or package request use_local_sources (boolean) : don’t raise an exception if target exist and match package version append_depends (list of str) : package requirements to add to depends remove_depends (list or str) : package requirements to remove from depends auto_inc_version (bool) : cabundle (SSLCABundle) : list of authorized certificate filenames. If None, use default from current wapt.

Returns:

PackageEntry : edit local package with sourcespath attribute populated

>>> wapt = Wapt(config_filename='c:/tranquilit/wapt/tests/wapt-get.ini')
>>> wapt.dbpath = ':memory:'
>>> r= wapt.update()
>>> tmpdir = tempfile.mkdtemp('wapt')
>>> res = wapt.edit_package('tis-wapttest',target_directory=tmpdir,append_depends='tis-firefox',remove_depends='tis-7zip')
>>> res['target'] == tmpdir and res['package'].package == 'tis-wapttest' and 'tis-firefox' in res['package'].depends
True
>>> import shutil
>>> shutil.rmtree(tmpdir)
error_packages()[source]

return install tasks with error status

forget_packages(packages_list)[source]

Remove install status for packages from local database without actually uninstalling the packages

Args:

packages_list (list): list of installed package names to forget

Returns:

list: list of package names actually forgotten

>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini')
>>> res = wapt.install('tis-test')
???
>>> res = wapt.is_installed('tis-test')
>>> isinstance(res,PackageEntry)
True
>>> wapt.forget_packages('tis-test')
['tis-test']
>>> wapt.is_installed('tis-test')
>>> print wapt.is_installed('tis-test')
None
generate_host_uuid(forced_uuid=None)[source]

Regenerate a random UUID for this host or force with supplied one.

Normally, the UUID is taken from BIOS through wmi.

In case bios returns some duplicates or garbage, it can be useful to force a random uuid. This is stored as uuid key in wapt-get.ini.

In case we want to link th host with a an existing record on server, we can force a old UUID.

Args;

forced_uuid (str): uuid to force for this host. If None, generate a random one

get_auth_token(purpose='websocket')[source]

Get an auth token from server providing signed uuid by provite host key.

Returns:

dict

get_cache_domain_info(force=False)[source]
get_cached_packages_uuids()[source]
get_config_files_list()[source]
get_config_hash()[source]
get_default_development_dir(packagecond, section='base')[source]

Returns the default development directory for package named <packagecond> based on default_sources_root ini parameter if provided

Args:

packagecond (PackageEntry or str): either PackageEntry or a “name(=version)” string

Returns:

unicode: path to local proposed development directory

get_host_certificate()[source]

Return the current host certificate. If the certificate does not yet exist, it is created as a self signed certificate and stored in get_host_certificate_filename path

Returns:

SSLCertificate: host public certificate.

get_host_certificate_authority_key_identifier()[source]
get_host_certificate_filename()[source]

Full filepath of the host own certificate and RSA public key

Returns:

str

get_host_certificate_fingerprint()[source]
get_host_certificate_signing_request()[source]

Return a CSR with CN and dnsname pointing to this host uuid

Returns:

SSLCertificateSigningRequest: host public certificate sigbinbg request.

get_host_key(create=True)[source]

Return private key used to sign uploaded data from host Create key if it does not exists yet.

Returns:

SSLPrivateKey: Private key used to sign data posted by host.

get_host_key_filename()[source]

Full filepath of the host own RSA private key

Returns:

str

get_host_locales()[source]
get_host_packages()[source]

Return list of implicit available host packages based on computer UUID and AD Org Units

Returns:

list: list of PackageEntry.

get_host_packages_names()[source]

Return list of implicit host package names based on computer UUID and AD Org Units

Returns:

list: list of str package names.

get_host_site()[source]
get_installed_host_packages()[source]

Get the implicit package names (host and unit packages) which are installed but no longer relevant

Returns:

list: of installed package names

get_installer_defaults(installer_path)[source]

Returns guessed default values for package templates based on installer binary

Args:

installer_path (str): filepath to installer

Returns:

dict:

>>> get_installer_defaults(r'c: ranquilit\wapt  ests\SumatraPDF-3.1.1-install.exe')
{'description': u'SumatraPDF Installer (Krzysztof Kowalczyk)',
'filename': 'SumatraPDF-3.1.1-install.exe',
'silentflags': '/VERYSILENT',
'simplename': u'sumatrapdf-installer',
'type': 'UnknownExeInstaller',
'version': u'3.1.1'}
>>> get_installer_defaults(r'c: ranquilit\wapt  estsz920.msi')
{'description': u'7-Zip 9.20 (Igor Pavlov)',
'filename': '7z920.msi',
'silentflags': '/q /norestart',
'simplename': u'7-zip-9.20',
'type': 'MSI',
'version': u'9.20.00.0'}
get_json_config_certificates_filenames(config_name)[source]
get_json_config_filename(config_name)[source]

Returns the filename for a json config named <config_name>

get_last_update_status()[source]

Get update status of host as stored at the end of last operation.

Returns:
dict:

‘date’: timestamp of last operation ‘runstatus’: last printed message of wapt core ‘running_tasks’: list of tasks ‘errors’: list of packages not installed properly ‘upgrades’: list of packages which need to be upgraded

get_next_audit_datetime()[source]

Return next datetime for next audit loop = minimum(next_audit_date)

get_outdated_host_packages()[source]

Check and return the available host packages available and not installed

get_package_attribute(package, key, default_value=None)[source]

Store in local db a key/value pair for later use

get_package_entries(packages_names)[source]

Return most up to date packages entries for packages_names packages_names is either a list or a string.

‘missing’ key lists the package requirements which are not available in the package index.

Args;

packages_names (list or str): list of package requirements

Returns:

dict : {‘packages’:[PackageEntries,],’missing’:[str,]}

>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini')
>>> res = wapt.get_package_entries(['tis-firefox','tis-putty'])
>>> isinstance(res['missing'],list) and isinstance(res['packages'][0],PackageEntry)
True
get_previous_package_params(package_entry)[source]

Return the params used when previous install of package_entry.package If no previous install, return {} The params are stored as json string in local package status table.

Args:

package_entry (PackageEntry): package request to lookup.

Returns:

dict

get_repo(repo_name)[source]
get_secured_token_generator()[source]
get_sources(package)[source]

Download sources of package (if referenced in package as a https svn) in the current directory

Args:

package (str or PackageRequest): package to get sources for

Returns:

str : checkout directory path

get_token_secret_key() str[source]
get_unrelevant_host_packages()[source]

Get the implicit package names (host and unit packages) which are installed but no longer relevant

Returns:

list: of installed package names

get_wapt_edition()[source]
global_attributes = ['wapt_base_dir', 'waptserver', 'config_filename', 'proxies', 'repositories', 'personal_certificate_path', 'public_certs_dir', 'package_cache_dir', 'dbpath', 'http_proxy', 'use_http_proxy_for_repo', 'use_http_proxy_for_server', 'limit_bandwidth', 'waptservice_user', 'waptservice_password', 'waptservice_admin_filter', 'waptservice_port', 'waptservice_poll_timeout', 'locales', 'custom_tags', 'packages_whitelist', 'packages_blacklist', 'maturities', 'host_uuid', 'use_fqdn_as_uuid', 'use_hostpackages', 'use_ad_groups', 'use_repo_rules', 'host_profiles', 'host_organizational_unit_dn', 'host_ad_site', 'allow_user_service_restart', 'allow_remote_shutdown', 'allow_remote_reboot', 'ldap_auth_server', 'ldap_auth_base_dn', 'ldap_auth_ssl_enabled', 'verify_cert_ldap', 'loglevel', 'loglevel_waptcore', 'loglevel_waptservice', 'loglevel_wapttasks', 'loglevel_waptws', 'loglevel_waptdb', 'loglevel_websocket', 'loglevel_waitress', 'log_to_windows_events', 'download_after_update_with_waptupdate_task_period', 'websockets_ping', 'websockets_retry_delay', 'websockets_check_config_interval', 'websockets_hurry_interval', 'notify_user', 'waptaudit_task_period', 'signature_clockskew', 'wol_relay', 'hiberboot_enabled', 'max_gpo_script_wait', 'pre_shutdown_timeout', 'minimum_battery_percent', 'uninstallkey_timeout', 'check_certificates_validity', 'token_lifetime', 'repositories', 'trust_all_certs_in_pems', 'wapt_temp_dir']
property hiberboot_enabled

get HiberbootEnabled.

host_capabilities()[source]

Return the current capabilities of host taken in account to determine packages list and whether update should be forced (when filter criteria are updated) This includes host certificate,architecture,locale,authorized certificates

Returns:

dict

host_capabilities_fingerprint()[source]

Return a fingerprint representing the current capabilities of host This includes host certificate,architecture,locale,authorized certificates

Returns:

str

property host_dn
property host_organizational_unit_dn

Get host org unit DN from wapt-get.ini [global] host_organizational_unit_dn if defined or from registry as supplied by AD / GPO process

host_packagename()[source]

Return package name for current computer

property host_profiles
property host_site
property host_uuid: str
http_upload_package(packages, wapt_server_user=None, wapt_server_passwd=None, progress_hook=None)[source]

Upload a package or host package to the waptserver.

Args:

packages (str or list): list of filepaths or PackageEntry to wapt packages to upload wapt_server_user (str) : user for basic auth on waptserver wapt_server_passwd (str) : password for basic auth on waptserver

Returns:

>>> from common import *
>>> wapt = Wapt(config_filename = r'C:\tranquilit\wapt\tests\wapt-get.ini')
>>> r = wapt.update()
>>> d = wapt.duplicate_package('tis-wapttest','toto')
>>> print d
{'target': u'c:\\users\\htouvet\\appdata\\local\\temp\\toto.wapt', 'package': PackageEntry('toto','119')}
>>> wapt.http_upload_package(d['package'],wapt_server_user='admin',wapt_server_passwd='password')
inc_status_revision(inc=1)[source]
install(apackages=None, force=False, params_dict={}, download_only=False, usecache=True, printhook=None, installed_by=None, only_priorities=None, only_if_not_process_running=False, process_dependencies=True)[source]

Install a list of packages and its dependencies removes first packages which are in conflicts package attribute

Returns a dictionary of (package requirement,package) with ‘install’,’skipped’,’additional’

Args:

apackages (list or str): list of packages requirements “packagename(=version)” or list of PackageEntry. force (bool) : reinstalls the packages even if it is already installed params_dict (dict) : parameters passed to the install() procedure in the packages setup.py of all packages

as params variables and as “setup module” attributes

download_only (bool) : don’t install package, but only download them usecache (bool) : use the already downloaded packages if available in cache directory printhook (func) : hook for progress print

Returns:
dict: with keys [‘skipped’, ‘additional’, ‘remove’, ‘upgrade’, ‘install’, ‘unavailable’] and list of

(package requirements, PackageEntry)

>>> wapt = Wapt(config_filename='c:/tranquilit/wapt/tests/wapt-get.ini')
>>> def nullhook(*args):
...     pass
>>> res = wapt.install(['tis-wapttest'],usecache=False,printhook=nullhook,params_dict=dict(company='toto'))
>>> isinstance(res['upgrade'],list) and isinstance(res['errors'],list) and isinstance(res['additional'],list) and isinstance(res['install'],list) and isinstance(res['unavailable'],list)
True
>>> res = wapt.remove('tis-wapttest')
>>> res == {'removed': ['tis-wapttest'], 'errors': []}
True
install_immediate(force=False, only_priorities=None, only_if_not_process_running=False)[source]

Install pending packages which must be forcibly installed at a specific time.

Args:

only_if_not_process_running: install package only if impacted_process are not running

Returns:
dict: {‘upgrade’: [], ‘additional’: [], ‘downloads’:

{‘downloaded’: [], ‘skipped’: [], ‘errors’: []},

‘remove’: [], ‘skipped’: [], ‘install’: [], ‘errors’: [], ‘unavailable’: []}

install_json_config(conf, config_name=None, priority=None)[source]

Add a dynamic configuration from dict conf with name config_name and priority

install_json_config_file(json_config_file, config_name=None, priority=None)[source]

Install a json config stored in a file

install_json_config_from_url(url=None, config_hash=None, config_name='default_config', priority=None)[source]

Load a json config from the remote wapt server (default location is /var/www/wapt/conf.d/<config_name>-<config_hash>.json) given its name and hash.

install_json_configb64(json_config_b64, config_name=None, priority=None)[source]

Install a json config encoded as a base64 string

install_wapt(fname, params_dict={}, explicit_by=None, force=None)[source]

Install a single wapt package given its WAPT filename. return install status

Args:

fname (str): Path to wapt Zip file or unzipped development directory params (dict): custom parmaters for the install function explicit_by (str): identify who has initiated the install

Returns:

str: ‘OK’,’ERROR’

Raises:

EWaptMissingCertificate EWaptNeedsNewerAgent EWaptUnavailablePackage EWaptConflictingPackage EWaptBadPackageAttribute EWaptException various Exception depending on setup script

installed(include_errors=False)[source]

Returns all installed packages with their status

Args:

include_errors (boolean): include packages wnot installed successfully

Returns:

list: list of PackageEntry merged with local install status.

inventory()[source]

Return full inventory of the computer as a dictionary.

Returns:

dict: {‘host_info’,’wapt_status’,’dmi’,’installed_softwares’,’installed_packages’}

…changed:

1.4.1: renamed keys 1.6.2.4: removed setup.py from packages inventory.

is_authorized_package_action(action, package, user_groups=[], rules=None)[source]
is_available(packagename)[source]

Check if a package (with optional version condition) is available in repositories.

Args:

packagename (str) : package name to lookup or package requirement ( packagename(=version) )

Returns:

list : of PackageEntry sorted by package version ascending

>>> wapt = Wapt(config_filename='c:/tranquilit/wapt/tests/wapt-get.ini')
>>> l = wapt.is_available('tis-wapttest')
>>> l and isinstance(l[0],PackageEntry)
True
is_enterprise()[source]
is_installed(packagename, include_errors=False)[source]

Checks if a package is installed. Return package entry and additional local status or None

Args:

packagename (str): name / {package_uuid}/ package request to query

Returns:
PackageEntry: None en PackageEntry merged with local install_xxx fields
  • install_date

  • install_output

  • install_params

  • install_status

is_locally_allowed_package(package)[source]

Return True if package is not in blacklist and is in whitelist if whitelist is not None packages_whitelist and packages_blacklist are list of package name wildcards (file style wildcards) blacklist is taken in account first if defined. whitelist is taken in acoount if not None, else all not blacklisted package names are allowed.

is_wapt_package_development_dir(directory)[source]

Return PackageEntry if directory is a wapt developement directory (a WAPT/control file exists) or False

is_wapt_package_file(filename)[source]

Return PackageEntry if filename is a wapt package or False True if file ends with .wapt and control file can be loaded and decoded from zip file

Args:

filename (str): path to a file

Returns:

False or PackageEntry

last_install_log(packagename)[source]

Get the printed output of the last install of package named packagename

Args:

packagename (str): name of package to query

Returns:

dict: {status,log} of the last install of a package

>>> w = Wapt()
>>> w.last_install_log('tis-7zip')
???
{'install_status': u'OK', 'install_output': u'Installing 7-Zip 9.38.0-1\n7-Zip already installed, skipping msi install\n',install_params: ''}
list(searchwords=[])[source]

Returns a list of installed packages which have the searchwords in their description

Args:
searchwords (list): list of words to llokup in package name and description

only entries which have words in the proper order are returned.

Returns:

list: list of PackageEntry matching the search words

>>> w = Wapt()
>>> w.list('zip')
[PackageEntry('tis-7zip','16.4-8') ]
list_upgrade(current_datetime=None)[source]

Returns a list of package requirements for packages which should be installed / upgraded / removed

Returns:

dict: {‘additional’: [], ‘install’: [], ‘remove’: [], ‘upgrade’: []}

load_config(config_filename=None, merge_config_packages=None)[source]

Load configuration parameters from supplied inifilename

make_group_template(packagename='', maturity=None, depends=None, conflicts=None, directoryname=None, section='group', description=None)[source]

Creates or updates on disk a skeleton of a WAPT group package. If the a package skeleton already exists in directoryname, it is updated.

sourcespath attribute of returned PackageEntry is populated with the developement directory of group package.

Args:

packagename (str): group name depends : conflicts directoryname section description

Returns:

PackageEntry

>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini')
>>> tmpdir = 'c:/tmp/dummy'
>>> if os.path.isdir(tmpdir):
...    import shutil
...    shutil.rmtree(tmpdir)
>>> p = wapt.make_group_template(packagename='testgroupe',directoryname=tmpdir,depends='tis-firefox',description=u'Test de groupe')
>>> print p
>>> print p['package'].depends
tis-firefox
>>> import shutil
>>> shutil.rmtree(tmpdir)
make_host_template(packagename='', depends=None, conflicts=None, directoryname=None, description=None)[source]
make_package_template(installer_path='', packagename='', directoryname='', section='base', description=None, depends='', version=None, silentflags=None, uninstallkey=None, maturity=None, architecture='all', target_os='all', product_name=None)[source]
Build a skeleton of WAPT package based on the properties of the supplied installer

Return the path of the skeleton

>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini')
>>> wapt.dbpath = ':memory:'
>>> files = 'c:/tmp/files'
>>> if not os.path.isdir(files):
...    os.makedirs(files)
>>> tmpdir = 'c:/tmp/dummy'
>>> devdir = wapt.make_package_template(files,packagename='mydummy',directoryname=tmpdir,depends='tis-firefox')
>>> os.path.isfile(os.path.join(devdir,'WAPT','control'))
True
>>> p = wapt.build_package(devdir)
>>> 'filename' in p and isinstance(p['files'],list) and isinstance(p['package'],PackageEntry)
True
>>> import shutil
>>> shutil.rmtree(tmpdir)
property max_gpo_script_wait

get / set the MaxGPOScriptWait.

merge_installed_softwares_and_wua_list()[source]
property merged_config_hash
network_reconfigure()[source]

Called whenever the network configuration has changed

packages_filter_for_host()[source]

Returns a PackageRequest object based on host capabilities to filter applicable packages from a repo

Returns:

PackageRequest

packages_matching(package_request=None, query=None, args=())[source]

Returns the list of known packages Entries matching a PackageRequest

Args:

package_request (PackageRequest): request

Returns:

list (of PackageEntry)

personal_certificate()[source]

Returns the personal certificates chain

Returns:

list (of SSLCertificate). The first one is the personal certificate. The other are useful if intermediate CA are used.

property pre_shutdown_timeout

get / set the pre shutdown timeout shutdown tasks.

private_key(private_key_password=None)[source]

SSLPrivateKey matching the personal_certificate When key has been found, it is kept in memory for later use.

Args:

private_key_password : password to use to decrypt key. If None, passwd_callback is called.

Returns:

SSLPrivateKey

Raises:

EWaptMissingPrivateKey if ket can not be decrypted or found.

property private_key_password_callback
reachable_ip()[source]

Return the local IP which is most probably reachable by wapt server

In case there are several network connections, returns the local IP

which Windows choose for sending packets to WaptServer.

This can be the most probable IP which would get packets from WaptServer.

Returns:

str: Local IP

read_audit_data(section, key, default=None, ptype=None, include_expired_data=True)[source]

Retrieve the latest value associated with section/key from database

read_audit_data_set(section, key, as_dict=False, raw_data=False, descending=True)[source]

Retrieve all the values associated with section/key from database

read_audit_data_since(last_query_date=None, raw_data=False)[source]

Retrieve all the values associated with section/key from database

read_param(name, default=None, ptype=None)[source]

read a param value from local db >>> wapt = Wapt(config_filename=’c:/wapt/wapt-get.ini’) >>> wapt.read_param(‘db_version’) u’20140410’

read_upgrade_status()[source]

Return last stored pending updates status

Returns:

dict: {running_tasks errors pending (dict) upgrades (list)}

register_computer(description=None, retry_with_password=False)[source]

Send computer informations to WAPT Server if description is provided, updates local registry with new description

Returns:

dict: response from server.

>>> wapt = Wapt()
>>> s = wapt.register_computer()
>>>
registration_inventory()[source]

Minimum inventory for initial registration

registry_uninstall_snapshot()[source]

Return list of uninstall ID from registry launched before and after an installation to capture uninstallkey

reload_config_if_updated()[source]

Check if config file has been updated, Return None if config has not changed or date of new config file if reloaded

>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini')
>>> wapt.reload_config_if_updated()
remove(packages_list, force=False, only_priorities=None, only_if_not_process_running=False)[source]

Removes a package giving its package name, unregister from local status DB

Args:
packages_list (str or list or path): packages to remove (package name,

list of package requirement, package entry or development directory)

force : if True, unregister package from local status database, even if uninstall has failed

Returns:

dict: {‘errors’: [], ‘removed’: [], ‘not_allowed’: []}

remove_json_config(config_name)[source]

Remove a json config given its name

remove_upgrade_shutdown_policy()[source]

Add a local shitdown policy to upgrade system

property repositories
reset_host_organizational_unit_dn()[source]

Reset forced host_organizational_unit_dn to AD / GPO registry defaults. If it was forced in ini file, remove setting from ini file.

reset_host_uuid()[source]

Reset host uuid to bios provided UUID. If it was forced in ini file, remove setting from ini file.

reset_local_password()[source]

Remove the local waptservice auth from ini file

reset_settings()[source]
run(*arg, **args)[source]
run_notfatal(*cmd, **args)[source]

Runs the command and wait for it termination returns output, don’t raise exception if exitcode is not null but return ‘’

running_tasks()[source]

return current install tasks

property runstatus: str

returns the current run status for tray display

save_domain_info(info)[source]
save_external_ip(ip)[source]
save_last_domain_info_date(lastdate)[source]
search(searchwords=[], exclude_host_repo=True, section_filter=None, newest_only=False)[source]

Returns a list of packages which have the searchwords in their description

Args:

searchwords (str or list): words to search in packages name or description exclude_host_repo (boolean): if True, don’t search in host repoisitories. section_filter (str or list): restrict search to the specified package sections/categories

Returns:

list: list of PackageEntry

self_service_auth(login, password, host_uuid, groups)[source]

Sends login and password to server, who then checks if it’s a valid user

Returns:

list: groups the user belongs to.

server_uuid()[source]
session_setup(package, force=False)[source]

Setup the user session for a specific system wide installed package” Source setup.py from database or filename

set_client_cert_auth(connection, force=False)[source]

Set client side ssl authentication for a waptserver or a waptrepo using host_certificate if client_certificate is not yet set in config and host certificate is able to do client_auth

Args:

connection: object with client_certificate, client_private_key and client_auth

set_local_password(user='admin', pwd='password')[source]

Set admin/password local auth for waptservice in ini file as a sha256 hex hash

set_package_attribute(package, key, value)[source]

Store in local db a key/value pair for later use

show_progress(show_box=False, msg='Loading...', progress=None, progress_max=None)[source]

Global hook to report progress feedback to the user

Args:

show_box (bool): indicate to display or hide the notification msg (str): A status message to display. If None, nothing is changed. progress (float): Completion progress_max (float): Target of completion.

sign_host_content(data)[source]

Sign data str with host private key with sha256 + RSA Args:

data (bytes) : data to sign

Returns

bytes: signature of sha256 hash of data.

sign_package(zip_or_directoryname, certificate=None, private_key_password=None, private_key=None, set_maturity=None, inc_package_release=False, keep_signature_date=False, excludes=[])[source]
Calc the signature of the WAPT/manifest.sha256 file and put/replace it in ZIP or directory.

if directory, creates WAPT/manifest.sha256 and add it to the content of package create a WAPT/signature file and it to directory or zip file.

known issueif zip file already contains a manifest.sha256 file, it is not removed, so there will be

2 manifest files in zip / wapt package.

Args:

zip_or_directoryname: filename or path for the wapt package’s content certificate (list): certificates chain of signer. private_key (SSLPrivateKey): the private key to use private_key_password (str) : passphrase to decrypt the private key. If None provided, use self.private_key_password_callback

Returns:

str: base64 encoded signature of manifest.sha256 file (content

store_upgrade_status(upgrades=None)[source]

Stores in DB the current pending upgrades and running installs for query by waptservice

trust_package_signer(cert)[source]

Add a certificate to the list of trusted package signers certificates Stores the PEM encoded data in public_certs_dir directory. The certificate is added to this current Wapt.cabundle instance for immediate use (until config is reloaded)

uninstall(packagename, params_dict={}, force=False)[source]

Launch the uninstall script of an installed package” Source setup.py from database or filename

uninstall_cmd(guid)[source]

return the (quiet) command stored in registry to uninstall a software given its registry key

unregister_computer()[source]

Remove computer informations from WAPT Server

Returns:

dict: response from server.

>>> wapt = Wapt()
>>> s = wapt.unregister_computer()
>>>
untrust_package_signer(cert)[source]

Removes a certificate from the list of trusted package signers certificates based on the fingerprint of the certificate. Certs in ssl public_certs_dir with matching fingerprint are deleted.

update(force=False, register=True)[source]

Update local database with packages definition from repositories

Args:
force (boolean): update even if Packages index on repository has not been

updated since last update (based on http headers)

register (boolean): Send informations about status of local packages to waptserver

Returns;

list of (host package entry,entry date on server)

Returns:

dict: {“added”,”removed”,”count”,”repos”,”upgrades”,”date”}

>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini')
>>> updates = wapt.update()
>>> 'count' in updates and 'added' in updates and 'upgrades' in updates and 'date' in updates and 'removed' in updates
True
update_crls(force=False)[source]
update_licences(force=False)[source]
update_package_install_status(**kwargs)[source]

Update the install status

update_repo_rules(force=False)[source]
update_server_status(force=False, include_wmi=None, include_dmi=None)[source]
Send host_info, installed packages and installed softwares,

and last update status informations to WAPT Server, but don’t send register info like dmi or wmi.

Changed in version 1.4.3: if last status has been properly sent to server and data has not changed, don’t push data again to server. the hash is stored in memory, so is not pass across threads or processes.

>>> wapt = Wapt()
>>> s = wapt.update_server_status()
>>>
upgrade(only_priorities=None, only_if_not_process_running=False)[source]

Install “well known” host package from main repository if not already installed then query localstatus database for packages with a version older than repository and install all newest packages

Args:

priorities (list of str): If not None, upgrade only packages with these priorities.

Returns:
dict: {‘upgrade’: [], ‘additional’: [], ‘downloads’:

{‘downloaded’: [], ‘skipped’: [], ‘errors’: []},

‘remove’: [], ‘skipped’: [], ‘install’: [], ‘errors’: [], ‘unavailable’: []}

upload_package(filenames, wapt_server_user=None, wapt_server_passwd=None)[source]

Method to upload a package using Shell command (like scp) instead of http upload You must define first a command in inifile with the form :

upload_cmd=”c:Program Files”puttypscp -v -l waptserver %(waptfile)s srvwapt:/var/www/%(waptdir)s/

or

upload_cmd=”C:Program FilesWinSCPWinSCP.exe” root@wapt.tranquilit.local /upload %(waptfile)s

You can define a “after_upload” shell command. Typical use is to update the Packages index

after_upload=”c:Program Files”puttyplink -v -l waptserver srvwapt.tranquilit.local “python /opt/wapt/wapt-scanpackages.py /var/www/%(waptdir)s/”

wapt_status()[source]

Wapt configuration and version informations

Returns:
dict: versions of main main files, waptservice config,

repos and waptserver config

>>> w = Wapt()
>>> w.wapt_status()
{
        'setuphelpers-version': '1.1.1',
        'waptserver': {
                'wapt_server': u'tranquilit.local',
                'proxies': {
                        'http': None,
                        'https': None
                },
                'server_url': 'https: //wapt.tranquilit.local'
        },
        'waptservice_protocol': 'http',
        'repositories': [{
                'wapt_server': u'tranquilit.local',
                'proxies': {
                        'http': None,
                        'https': None
                },
                'name': 'global',
                'repo_url': 'http: //wapt.tranquilit.local/wapt'
        },
        {
                'wapt_server': u'tranquilit.local',
                'proxies': {
                        'http': None,
                        'https': None
                },
                'name': 'wapt-host',
                'repo_url': 'http: //srvwapt.tranquilit.local/wapt-host'
        }],
        'common-version': '1.1.1',
        'wapt-exe-version': u'1.1.1.0',
        'waptservice_port': 8088,
        'wapt-py-version': '1.1.1'
}
property waptdb: WaptDB

Wapt database

property waptserver
waptserver_available()[source]

Test reachability of waptserver.

If waptserver is defined and available, return True, else False

Returns:

boolean: True if server is defined and actually reachable

property waptsessiondb: WaptSessionDB

Wapt user session database

write_audit_data(section, key, value, ptype=None, value_date=None, expiration_date=None, max_count=2, keep_days=None)[source]

Stores in database a metrics, removes expired ones

Args:

section (str) key (str) value (any) value_date (str or datetime): value date (utc). By default datetime.datetime.utcnow() expiration_date (str) : expiration date of the new value max_count (int) : keep at most max_count value. remove oldest one. keep_days (int) : set the expiration date to now + keep_days days. override expiration_date arg if not None

Returns:

None

write_audit_data_if_changed(section, key, value, ptype=None, value_date=None, expiration_date=None, max_count=2, keep_days=None)[source]

Write data only if different from last one

Returns:

previous value

write_config(config_filename=None)[source]

Update configuration parameters to supplied inifilename

write_param(name, value)[source]

Store in local db a key/value pair for later use

property wua_repository
class common.WaptBaseDB(dbpath)[source]

Bases: BaseObjectClass

begin()[source]
commit()[source]
curr_db_version = None
property db
property db_version
property dbpath
delete_param(name)[source]
execute(query, args=())[source]

Wrapper around sqlite.execute.

get_param(name, default=None, ptype=None)[source]

Retrieve the value associated with name from database

initdb()[source]
query(query, args=(), one=False, as_dict=True)[source]

execute la requete query sur la db et renvoie un tableau de dictionnaires

rollback()[source]
set_package_attribute(install_id, key, value)[source]

Store permanently a (key/value) pair in database for a given package, replace existing one

set_param(name, value, ptype=None)[source]

Store permanently a (name/value) pair in database, replace existing one

upgradedb(force=False)[source]

Update local database structure to current version if rules are described in db_upgrades

Args:

force (bool): force upgrade even if structure version is greater than requested.

Returns:

tuple: (old_structure_version,new_structure_version)

class common.WaptDB(dbpath)[source]

Bases: WaptBaseDB

Class to manage SQLite database with local installation status

add_package_entry(package_entry, locale_code=None)[source]

Add a package into the database

add_start_install(package_entry, params_dict={}, explicit_by=None)[source]

Register the start of installation in local db

Args:

params_dict (dict) : dictionary of parameters provided on command line with –param or by the server explicit_by (str) : username of initiator of the install.

if not None, install is not a dependencie but an explicit manual install

setuppy (str)python source code used for install, uninstall or session_setup

code used for uninstall or session_setup must use only wapt self library as package content is no longer available at this step.

Returns:

int : rowid of the inserted install status row

audit_status()[source]

Return WORST audit status among properly installed packages

build_depends(packages, packages_filter=None)[source]

Given a list of packages conditions (packagename (optionalcondition)) return a list of dependencies (packages conditions) to install

Args:

packages (list of str): list of packages requirements ( package_name(=version) )

Returns:

(list depends,list conflicts,list missing) : tuple of (all_depends,missing_depends)

TODO : choose available dependencies in order to reduce the number of new packages to install

>>> waptdb = WaptDB(':memory:')
>>> office = PackageEntry('office','0')
>>> firefox22 = PackageEntry('firefox','22')
>>> firefox22.depends = 'mymissing,flash'
>>> firefox24 = PackageEntry('firefox','24')
>>> thunderbird = PackageEntry('thunderbird','23')
>>> flash10 = PackageEntry('flash','10')
>>> flash12 = PackageEntry('flash','12')
>>> office.depends='firefox(<24),thunderbird,mymissing'
>>> firefox22.depends='flash(>=10)'
>>> firefox24.depends='flash(>=12)'
>>> waptdb.add_package_entry(office)
>>> waptdb.add_package_entry(firefox22)
>>> waptdb.add_package_entry(firefox24)
>>> waptdb.add_package_entry(flash10)
>>> waptdb.add_package_entry(flash12)
>>> waptdb.add_package_entry(thunderbird)
>>> waptdb.build_depends('office')
([u'flash(>=10)', u'firefox(<24)', u'thunderbird'], [u'mymissing'])
curr_db_version = '20210420'
get_not_installed_from_uuids(uuids)[source]
get_packages_from_uuids(uuids)[source]
initdb()[source]

Initialize current sqlite db with empty table and return structure version

install_status(id)[source]

Return the local install status for id

Args:

id: sql rowid

Returns:

dict : merge of package local install, audit and package attributes.

installed(include_errors=False, include_setup=True)[source]

Return a list of installed packages on this host (status ‘OK’ or ‘UNKNWON’)

Args:
include_errors (bool)if False, only packages with status ‘OK’ and ‘UNKNOWN’ are returned

if True, all packages are installed.

include_setup (bool) : if True, setup.py files content is in the result rows

Returns:

list: of installed PackageEntry

installed_matching(package_cond, include_errors=False, include_setup=True)[source]

Return a list of PackageEntry if one properly installed (if include_errors=False) package match the package condition ‘tis-package (>=version)’

Args:

package_cond (str): package requirement to lookup include_errors

Returns:

list of PackageEntry merge with localstatus attributes WITH setuppy

installed_package_names(include_errors=False)[source]
installed_packages_inventory()[source]

Return a list of installed packages status suitable for inventory

List of list. First line is the header (fields names)

Returns:

list (of list): of installed packages

Return a list of installed package entries based on search keywords

Returns:

list of PackageEntry merge with localstatus attributes without setuppy

known_packages()[source]

Return a dict of all known packages PackageKey(s) indexed by package_uuid

Returns:

dict {‘package_uuid’:PackageKey(package)}

packages_audit_inventory(after_date=None)[source]

Return a list of packages audit status suitable for inventory

List of list. First line is the header (fields names)

Returns:

list (of list): of installed packages

packages_matching(package_cond)[source]

Return an ordered list of available packages entries which match the condition “packagename[([=<>]version)]?” version ascending

Args:

package_cond (PackageRequest or str): filter packages and determine the ordering

Returns:

list of PakcageEntry

Return a list of package entries matching the search words

Args:

searchwords (list): list of words which must be in package name or description exclude_host (bool): don’t take in account packages comming from a repo named ‘wapt-host” section_filter (list): list of packages sections to take in account packages_filter (PackageRequest): additional filters (arch, locale, maturities etc…)

to take in account for filter and sort

Returns:

list of PackageEntry

params(packagename)[source]

Return install parameters associated with a package

purge_repo(repo_name)[source]

remove references to repo repo_name

>>> waptdb = WaptDB('c:/wapt/db/waptdb.sqlite')
>>> waptdb.purge_repo('main')
query_package_entry(query, args=(), one=False, package_request=None)[source]

Execute the query on the db try to map result on PackageEntry attributes Fields which don’t match attributes are added as attributes (and listed in _calc_attributes list)

Args:

query (str): sql query args (list): parameters of the sql query package_request (PackageRequest): keep only entries which match package_request one(bool): if True, return the highest available version (if package_request is not None, use it to compare packages)

Result:

PackageEntry or list of PackageEntry

>>> waptdb = WaptDB(':memory:')
>>> waptdb.add_package_entry(PackageEntry('toto','0',repo='main'))
>>> waptdb.add_package_entry(PackageEntry('dummy','2',repo='main'))
>>> waptdb.add_package_entry(PackageEntry('dummy','1',repo='main'))
>>> waptdb.query_package_entry("select * from wapt_package where package=?",["dummy"])
[PackageEntry('dummy','2'), PackageEntry('dummy','1')]
>>> waptdb.query_package_entry("select * from wapt_package where package=?",["dummy"],one=True)
PackageEntry('dummy','2')
remove_install_status(package=None, package_uuid=None)[source]

Remove status of package installation from localdb

store_setuppy(rowid, setuppy=None, install_params={})[source]

Update status of package installation on localdb

switch_to_explicit_mode(package, user_id)[source]

Set package install mode to manual so that package is not removed when meta packages don’t require it anymore

update_audit_status(rowid, set_status=None, set_output=None, append_line=None, set_last_audit_on=None, set_next_audit_on=None)[source]

Update status of package installation on localdb

update_install_status(rowid, set_status=None, append_line=None, uninstall_key=None, persistent_dir=None)[source]

Update status of package installation on localdb

update_install_status_pid(pid, set_status='ERROR')[source]

Update status of package installation on localdb

upgradeable(include_errors=True, check_version_only=True, include_setup=False)[source]

Return a dictionary of upgradable Package entries

Returns:

dict ‘package’: [candidates] order by mot adequate to more generic and

upgradeable_status()[source]

Return a list of upgradable Package entries. (faster than upgradeable() )

Returns:

list of (installed PackageEntry, candidate PackageEntry)

wapt_package_columns = {'architecture': 'varchar', 'audit_schedule': 'varchar', 'categories': 'varchar', 'changelog': 'varchar', 'conflicts': 'varchar', 'depends': 'varchar', 'description': 'varchar', 'editor': 'varchar', 'filename': 'varchar', 'forced_install_on': 'varchar', 'homepage': 'varchar', 'icon_sha256sum': 'varchar', 'id': 'INTEGER PRIMARY KEY AUTOINCREMENT', 'impacted_process': 'varchar', 'installed_size': 'integer', 'keywords': 'varchar', 'licence': 'varchar', 'locale': 'varchar', 'maintainer': 'varchar', 'maturity': 'varchar', 'max_os_version': 'varchar', 'md5sum': 'varchar', 'min_os_version': 'varchar', 'min_wapt_version': 'varchar', 'name': 'varchar', 'package': 'varchar', 'package_uuid': 'varchar', 'priority': 'varchar', 'repo': 'varchar', 'repo_url': 'varchar', 'section': 'varchar', 'signature': 'varchar', 'signature_date': 'varchar', 'signed_attributes': 'varchar', 'signer': 'varchar', 'signer_fingerprint': 'varchar', 'size': 'integer', 'sources': 'varchar', 'target_os': 'varchar', 'valid_from': 'varchar', 'valid_until': 'varchar', 'version': 'varchar'}
class common.WaptHostRepo(url=None, name='wapt-host', verify_cert=None, http_proxy=None, timeout=None, host_id=None, cabundle=None, config=None, section=None, host_key=None, WAPT=None)[source]

Bases: WaptRepo

Dummy http repository for host packages

>>> host_repo = WaptHostRepo(name='wapt-host',host_id=['0D2972AC-0993-0C61-9633-529FB1A177E3','4C4C4544-004E-3510-8051-C7C04F325131'])
>>> host_repo.load_config_from_file(r'C:\Users\htouvet\AppData\Local\waptconsole\waptconsole.ini')
>>> host_repo.packages()
[PackageEntry('0D2972AC-0993-0C61-9633-529FB1A177E3','10') ,
 PackageEntry('4C4C4544-004E-3510-8051-C7C04F325131','30') ]
download_packages(package_requests, target_dir=None, usecache=True, printhook=None)[source]

Download a list of packages from repo

Args:

package_request (list,PackageEntry): a list of PackageEntry to download target_dir (str): where to store downloaded Wapt Package files usecache (bool): wether to try to use cached Wapt files if checksum is ok printhook (callable): to show progress of download

Returns:

dict: {“downloaded”:[local filenames],”skipped”:[filenames in cache],”errors”:[],”packages”:self.packages()}

property host_id
host_package_url(host_id=None)[source]
is_available()[source]

Check if repo is reachable an return creation date of Packages.

Try to access the repo and return last modified date of repo index or None if not accessible

Returns:

str: Iso creation date of remote Package file as returned in http headers

>>> repo = WaptRemoteRepo(name='main',url='https://wapt/wapt',timeout=1)
>>> repo.is_available() <= datetime2isodate()
True
>>> repo = WaptRemoteRepo(name='main',url='https://badwapt/wapt',timeout=1)
>>> repo.is_available() is None
True
load_config(config, section=None)[source]

Load waptrepo configuration from inifile section.

Use name of repo as section name if section is not provided. Use ‘global’ if no section named section in ini file

property repo_url

Repository URL

Fixed url if none is set in wapt-get.ini by querying the server.

The URL is queried once and then cached into a local property.

Returns:

str: url to the repository

>>> repo = WaptRepo(name='wapt',timeout=4)
>>> print repo.wapt_server
http://wapt.wapt.fr/
>>> repo = WaptRepo(name='wapt',timeout=4)
>>> print repo.wapt_server
http://wapt.wapt.fr/
>>> print repo.repo_url
http://srvwapt.tranquilit.local/wapt
class common.WaptPackageAuditLogger(console, wapt_context=None, install_id=None, user=None, running_status='RUNNING', exit_status=None, error_status='ERROR')[source]

Bases: LogOutput

Context handler to log all print messages to a wapt package audit log

Args:

console (file) : sys.stderr wapt_context (Wapt): Wapt instance install_id (int): name of running or installed package local status where to log status and output

>>>
class common.WaptPackageInstallLogger(console, wapt_context=None, install_id=None, user=None, running_status='RUNNING', exit_status='OK', error_status='ERROR')[source]

Bases: LogOutput

Context handler to log all print messages to a wapt package install log

Args:

wapt_context (Wapt): Wapt instance package_name (str): name of running or installed package local status where to log status and output

>>>
class common.WaptPackageSessionSetupLogger(console, waptsessiondb, install_id, running_status='RUNNING', exit_status=None, error_status='ERROR')[source]

Bases: LogOutput

Context handler to log all print messages to a wapt package install log

Args:

wapt_context (Wapt): Wapt instance package_name (str): name of running or installed package local status where to log status and output

>>>
class common.WaptRepo(url=None, name='wapt', verify_cert=None, http_proxy=None, timeout=None, cabundle=None, config=None, section=None, WAPT=None)[source]

Bases: WaptRemoteRepo

Gives access to a remote http repository, with a zipped Packages packages index Find its repo_url based on * repo_url explicit setting in ini config section [<name>] * if there is some rules use rules >>> repo = WaptRepo(name=’main’,url=’http://wapt/wapt’,timeout=4) >>> packages = repo.packages() >>> len(packages)

property WAPT
as_dict()[source]

returns a dict representation of the repository configuration and parameters

property cached_wapt_repo_url
find_wapt_repo_url()[source]

Find a wapt_repo_url from rules Returns:

str: URL to the repo.

load_config(config, section=None)[source]

Load waptrepo configuration from inifile section.

Use name of repo as section name if section is not provided. Use ‘global’ if no section named section in ini file

property proxies

dict for http proxies url suitable for requests based on the http_proxy repo attribute

Returns:

dict: {‘http’:’http://proxy:port’,’https’:’http://proxy:port’}

property repo_url

Repository URL

Fixed url if none is set in wapt-get.ini by querying the server.

The URL is queried once and then cached into a local property.

Returns:

str: url to the repository

>>> repo = WaptRepo(name='wapt',timeout=4)
>>> print repo.wapt_server
http://wapt.wapt.fr/
>>> repo = WaptRepo(name='wapt',timeout=4)
>>> print repo.wapt_server
http://wapt.wapt.fr/
>>> print repo.repo_url
http://srvwapt.tranquilit.local/wapt
reset_network()[source]

called by wapt when network configuration has changed

property rules
rulesdb()[source]

Get rules from DB

class common.WaptServer(url=None, proxies={'http': None, 'https': None}, timeout=5.0, dnsdomain=None, name='waptserver')[source]

Bases: BaseObjectClass

Manage connection to waptserver

as_dict()[source]
ask_user_password(action=None)[source]

Ask for basic auth if server requires it

auth(action=None, enable_password_callback=True)[source]

Return an authenticator for the action. This is basically a tuple (user,password) or a more sophisticated AuthBase descendent. The motivation for action is to not request password from user if not needed. Or enable kerberos for specific endpoint.

Args:

action (str): for which an authenticator is requested

Returns:

tuple or requests.auth.AuthBase descendant instance

available()[source]
clear_session()[source]
client_auth()[source]

Return SSL pair (cert,key) filenames for client side SSL auth

Returns:

tuple: (cert path,key path,strkeypassword)

get(action, auth=None, timeout=None, use_ssl_auth=True, enable_capture_external_ip=True, enable_password_callback=True, decode_json=True)[source]

Make a get http request to the server, and return result decoded from json This assuems remo

Args:

action (str): doc part of the url auth (tuple): authentication passed to requests (user,password) enable_capture_external_ip (bool) : if true and callback is defined, try to get external IP from X-Remote-IP header

and set it though self.capture_external_ip_callback

enable_password_callback (bool) : if true, password callback will be called if needed.

Returns:

dict : response returned from server as json data.

get_private_key_password(location, identity)[source]
get_requests_session(use_ssl_auth=True)[source]
head(action, auth=None, timeout=None, use_ssl_auth=True, enable_capture_external_ip=True, enable_password_callback=True)[source]
load_config(config, section='global')[source]

Load waptserver configuration from inifile

load_config_from_file(config_filename, section='global')[source]

Load waptserver configuration from an inifile located at config_filename

Args:

config_filename (str) : path to wapt inifile section (str): ini section from which to get parameters. default to ‘global’

Returns:

WaptServer: self

post(action, data=None, files=None, auth=None, timeout=None, signature=None, signer=None, content_length=None, use_ssl_auth=True, enable_capture_external_ip=True, enable_password_callback=True, max_retry_count=3)[source]

Post data to waptserver using http POST method

Add a signature to the posted data using host certificate.

Posted Body is gzipped

Args:

action (str): doc part of the url data (str) : posted data body files (list or dict) : list of filenames

Returns:

dict : response returned from server as json data.

reset_network()[source]

called by wapt when network configuration has changed

save_server_certificate(server_ssl_dir=None, overwrite=False)[source]

Retrieve certificate of https server for further checks

Args:

server_ssl_dir (str): Directory where to save x509 certificate file

Returns:

str : full path to x509 certificate file.

property server_url

Return fixed url if any

>>> server = WaptServer(timeout=4)
>>> print server.server_url
https://wapt.tranquil.it
upload_packages(packages, auth=None, timeout=None, progress_hook=None)[source]

Upload a list of PackageEntry with local wapt build/signed files

Returns:

dict: {‘ok’,’errors’} list of http post upload results

class common.WaptSessionDB(username='')[source]

Bases: WaptBaseDB

add_start_install(package_entry)[source]

Register the start of installation in local db

Returns:

int : rowid of the inserted record

curr_db_version = '20201217'
initdb()[source]

Initialize current sqlite db with empty table and return structure version

is_installed(package, version)[source]
remove_install_status(package)[source]

Remove status of package installation from localdb

>>> wapt = Wapt()
>>> wapt.forget_packages('tis-7zip')
???
remove_obsolete_install_status(installed_packages)[source]

Remove local user status of packages no more installed

update_install_status(rowid, set_status=None, append_line=None)[source]

Update status of package installation on localdb

update_install_status_pid(pid, set_status='ERROR')[source]

Update status of package installation on localdb

common.authorized_packages_for_token(wapt, token, package_requests=None)[source]
common.check_is_member_of(huser, group_name)[source]

Check if a user is a member of a group

Args:

huser (handle) : pywin32 group_name (str) : group

>>> from win32security import LogonUser
>>> hUser = win32security.LogonUser ('technique','tranquilit','xxxxxxx',win32security.LOGON32_LOGON_NETWORK,win32security.LOGON32_PROVIDER_DEFAULT)
>>> check_is_member_of(hUser,'domain admins')
False
common.check_user_membership(user_name, password, domain_name, group_name)[source]

Check if a user is a member of a group

Args:

user_name (str): user password (str): domain_name (str) : If empty, check local then domain group_name (str): group

>>> from win32security import LogonUser
>>> hUser = win32security.LogonUser ('technique','tranquilit','xxxxxxx',win32security.LOGON32_LOGON_NETWORK,win32security.LOGON32_PROVIDER_DEFAULT)
>>> check_is_member_of(hUser,'domain admins')
False
common.get_domain_admins_group_name()[source]
Return localized version of domain admin group (ie “domain admins” or

“administrateurs du domaine” with RID -512)

>>> get_domain_admins_group_name()
u'Domain Admins'
common.get_local_admins_group_name()[source]
common.host_ipv4()[source]

return a list of (iface,mac,{addr,broadcast,netmask})

common.is_system_user()[source]
common.lookup_name_from_rid(domain_controller, rid)[source]
return username or group name from RID (with localization if applicable)

from https://mail.python.org/pipermail/python-win32/2006-May/004655.html domain_controller : should be a DC rid : integer number (512 for domain admins, 513 for domain users, etc.)

>>> lookup_name_from_rid('srvads', DOMAIN_GROUP_RID_ADMINS)
u'Domain Admins'
common.self_service_rules(wapt)[source]

Returns dict of allowed packages for users and groups

common.sid_from_rid(domain_controller, rid)[source]

Return SID structure based on supplied domain controller’s domain and supplied rid rid can be for example DOMAIN_GROUP_RID_ADMINS, DOMAIN_GROUP_RID_USERS

common.tryurl(url, proxies=None, timeout=5.0, auth=None, verify_cert=False, cert=None)[source]
common.wapt_sources_edit(wapt_sources_dir: str, editor_for_packages: Optional[str] = None) str[source]
Utility to open PyScripter or the configured editor with package sources if it is installed

else open the wapt package development directory in System Shell Explorer.

Args

wapt_sources_dir (str): directory path of the wapt package sources

Returns:

str: sources path

common.win_check_editor_available(editor_for_packages: str = '') bool[source]