Files
openfreemap/ssh_lib/utils.py
Zsolt Ero 24dfa2ce37 work
2025-10-16 12:41:41 +02:00

236 lines
5.9 KiB
Python

import os
import secrets
import socket
import string
import sys
from pathlib import Path
import requests
from fabric import Connection
from invoke import UnexpectedExit
def put(
c, local_path, remote_path, permissions=None, user='root', group=None, create_parent_dir=False
):
tmp_path = f'/tmp/fabtmp_{random_string(8)}'
c.put(local_path, tmp_path)
if create_parent_dir:
dirname = os.path.dirname(remote_path)
c.sudo(f'mkdir -p {dirname}')
set_permission(c, dirname, user=user, group=group)
if is_dir(c, remote_path):
if not remote_path.endswith('/'):
remote_path += '/'
filename = os.path.basename(local_path)
remote_path += filename
c.sudo(f"mv '{tmp_path}' '{remote_path}'")
c.sudo(f"rm -rf '{tmp_path}'")
set_permission(c, remote_path, permissions=permissions, user=user, group=group)
def put_dir(
c,
local_dir: Path,
remote_dir,
dir_permissions=None,
file_permissions=None,
user='root',
group=None,
exclude_set=None,
):
"""
copies all files from local path to remote path
not recursive
"""
files = [file for file in local_dir.iterdir() if file.is_file()]
if exclude_set:
files = [f for f in files if f.name not in exclude_set]
c.sudo(f'mkdir -p "{remote_dir}"')
set_permission(c, remote_dir, permissions=dir_permissions, user=user, group=group)
for file in files:
print(f'uploading {remote_dir}/{file.name}')
put(c, file, f'{remote_dir}/{file.name}', file_permissions, user, group)
def put_str(c, remote_path, str_, create_parent_dir=False):
tmp_file = 'tmp.txt'
with open(tmp_file, 'w') as outfile:
outfile.write(str_ + '\n')
put(c, tmp_file, remote_path, create_parent_dir=create_parent_dir)
os.remove(tmp_file)
def file_contains(c, file_path, search_str):
"""Check if a file contains a specific string."""
if not exists(c, file_path):
return False
# Use grep -qF for fixed string search (no regex interpretation)
# -q for quiet (no output), -F for fixed string
result = c.sudo(f"grep -qF '{search_str}' '{file_path}'", warn=True, hide=True)
return result.ok
def append_str(c, remote_path, str_, check_duplicate=False):
"""Append string to file. If check_duplicate=True, only append if string doesn't exist."""
if check_duplicate and file_contains(c, remote_path, str_.strip()):
return False # String already exists, didn't append
tmp_path = f'/tmp/fabtmp_{random_string(8)}'
put_str(c, tmp_path, str_)
sudo_cmd(c, f"cat '{tmp_path}' >> '{remote_path}'")
c.sudo(f'rm -f {tmp_path}')
return True # Successfully appended
def sudo_cmd(c, cmd, *, user=None):
cmd = cmd.replace('"', '\\"')
try:
c.sudo(f'bash -c "{cmd}"', user=user)
except UnexpectedExit as e:
print(f'Command failed: {e.result.command}')
print(f'Error: {e.result.stderr}')
sys.exit(1)
def run_nice(c, cmd):
try:
c.run(cmd)
except UnexpectedExit as e:
print(f'Command failed: {e.result.command}')
print(f'Error: {e.result.stderr}')
sys.exit(1)
def set_permission(c, path, *, permissions=None, user=None, group=None):
if user:
if not group:
group = user
c.sudo(f"chown {user}:{group} '{path}'")
if permissions:
c.sudo(f"chmod {permissions} '{path}'")
def reboot(c):
print('Rebooting')
try:
c.sudo('reboot')
except Exception:
pass
def exists(c, path):
return c.sudo(f"test -e '{path}'", hide=True, warn=True).ok
def is_dir(c, path):
return c.sudo(f"test -d '{path}'", hide=True, warn=True).ok
def random_string(length):
return ''.join(secrets.choice(string.ascii_uppercase + string.digits) for _ in range(length))
def ubuntu_release(c):
return c.run('lsb_release -rs').stdout.strip()[:2]
def ubuntu_codename(c):
return c.run('lsb_release -cs').stdout.strip()
def apt_get_update(c):
c.sudo('apt-get update')
def apt_get_install(c, pkgs, warn=False):
c.sudo(
f'DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends {pkgs}',
warn=warn,
echo=True,
)
def apt_get_purge(c, pkgs):
c.sudo(f'DEBIAN_FRONTEND=noninteractive apt-get purge -y {pkgs}')
def apt_get_autoremove(c):
c.sudo('DEBIAN_FRONTEND=noninteractive apt-get autoremove -y')
def get_username(c):
return c.run('whoami').stdout.strip()
def add_user(c, username, passwd=None, uid=None):
uid_str = f'--uid={uid}' if uid else ''
# --disabled-password -> ssh-key login only
c.sudo(f'adduser --disabled-password --gecos "" {uid_str} {username}', warn=True)
if passwd:
sudo_cmd(c, f'echo "{username}:{passwd}" | chpasswd')
def remove_user(c, username):
c.sudo(f'userdel -r {username}', warn=True)
c.sudo(f'rm -rf /home/{username}')
def enable_sudo(c, username, nopasswd=False):
c.sudo(f'usermod -aG sudo {username}')
if nopasswd:
put_str(c, '/etc/sudoers.d/tmp.', f'{username} ALL=(ALL) NOPASSWD:ALL')
set_permission(c, '/etc/sudoers.d/tmp.', permissions='440', user='root')
c.sudo(f'mv /etc/sudoers.d/tmp. /etc/sudoers.d/{username}')
def get_latest_release_github(user, repo):
url = f'https://api.github.com/repos/{user}/{repo}/releases/latest'
r = requests.get(url)
r.raise_for_status()
data = r.json()
assert data['tag_name'] == data['name']
return data['tag_name']
def get_ip_from_ssh_alias(ssh_alias):
"""
Get IP address from SSH config alias.
Args:
ssh_alias: SSH hostname/alias from ~/.ssh/config
Returns:
str: IP address
Raises:
socket.gaierror: If hostname cannot be resolved
"""
# Create connection (doesn't actually connect)
conn = Connection(ssh_alias)
# Get the resolved hostname from SSH config
hostname = conn.host
# Resolve to IP
ip_address = socket.gethostbyname(hostname)
return ip_address