scripts -> modules

This commit is contained in:
Zsolt Ero
2024-08-29 16:33:59 +02:00
parent 7196e15837
commit 66d0bdc515
54 changed files with 65 additions and 52 deletions

View File

@@ -0,0 +1,9 @@
from pathlib import Path
if Path('/data/ofm/config').exists():
OFM_CONFIG_DIR = Path('/data/ofm/config')
else:
OFM_CONFIG_DIR = Path(__file__).parent.parent.parent.parent / 'config'
assert OFM_CONFIG_DIR.exists()

View File

@@ -0,0 +1,109 @@
from pprint import pprint
import requests
# docs: https://api.cloudflare.com/
def cloudflare_get(path: str, params: dict, cloudflare_api_token: str):
headers = {'Authorization': f'Bearer {cloudflare_api_token}'}
res = requests.get(
f'https://api.cloudflare.com/client/v4{path}', headers=headers, params=params
)
res.raise_for_status()
data = res.json()
assert data['success'] is True
return data
def get_zone_id(domain, cloudflare_api_token: str):
data = cloudflare_get(
'/zones', params=dict(name=domain), cloudflare_api_token=cloudflare_api_token
)
assert len(data['result']) == 1
zone_info = data['result'][0]
return zone_info['id']
def get_dns_records_round_robin(zone_id, cloudflare_api_token: str) -> dict:
data = cloudflare_get(
f'/zones/{zone_id}/dns_records',
params=dict(per_page=5000),
cloudflare_api_token=cloudflare_api_token,
)
records = data['result']
data = {}
for r in records:
if r['type'] != 'A':
continue
data.setdefault(r['name'], [])
data[r['name']].append(dict(content=r['content'], id=r['id']))
return data
def set_records_round_robin(
zone_id,
*,
name: str,
host_ip_set: set,
ttl: int = 1,
proxied: bool,
comment: str = None,
cloudflare_api_token: str,
) -> bool:
headers = {'Authorization': f'Bearer {cloudflare_api_token}'}
dns_records = get_dns_records_round_robin(zone_id, cloudflare_api_token=cloudflare_api_token)
current_records = dns_records.get(name, [])
current_ips = {r['content'] for r in current_records}
if current_ips == host_ip_set:
print(f'No need to update records: {name} currently set: {sorted(current_ips)}')
return False
# changing records
# delete all current records first
for r in current_records:
delete_record(zone_id, id_=r['id'], cloudflare_api_token=cloudflare_api_token)
# create new records
for ip in host_ip_set:
print(f'Creating record: {name} {ip}')
json_data = dict(
type='A',
name=name,
content=ip,
ttl=ttl,
proxied=proxied,
comment=comment,
)
res = requests.post(
f'https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records',
headers=headers,
json=json_data,
)
res.raise_for_status()
data = res.json()
assert data['success'] is True
return True
def delete_record(zone_id, *, id_: str, cloudflare_api_token: str):
headers = {'Authorization': f'Bearer {cloudflare_api_token}'}
print(f'Deleting record: {id_}')
res = requests.delete(
f'https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records/{id_}',
headers=headers,
json={},
)
res.raise_for_status()
data = res.json()
assert data['success'] is True

View File

@@ -0,0 +1,54 @@
from io import BytesIO
from pathlib import Path
import pycurl
def pycurl_status(url, domain, host_ip):
"""
Uses pycurl to make a HTTPS HEAD request using custom resolving,
checks if the status code is 200
"""
c = pycurl.Curl()
c.setopt(c.URL, url)
# linux needs CA certs specified manually
if Path('/etc/ssl/certs/ca-certificates.crt').exists():
c.setopt(c.CAINFO, '/etc/ssl/certs/ca-certificates.crt')
c.setopt(c.RESOLVE, [f'{domain}:443:{host_ip}'])
c.setopt(c.NOBODY, True)
c.setopt(c.TIMEOUT, 5)
c.perform()
status_code = c.getinfo(c.RESPONSE_CODE)
c.close()
return status_code
def pycurl_get(url, domain, host_ip):
"""
Uses pycurl to make a HTTPS GET request using custom resolving,
checks if the status code is 200, and returns the content.
"""
buffer = BytesIO()
c = pycurl.Curl()
c.setopt(c.URL, url)
# linux needs CA certs specified manually
if Path('/etc/ssl/certs/ca-certificates.crt').exists():
c.setopt(c.CAINFO, '/etc/ssl/certs/ca-certificates.crt')
c.setopt(c.RESOLVE, [f'{domain}:443:{host_ip}'])
c.setopt(c.WRITEDATA, buffer)
c.setopt(c.TIMEOUT, 5)
c.perform()
status_code = c.getinfo(c.RESPONSE_CODE)
c.close()
if status_code != 200:
raise ValueError(f'status code: {status_code}')
return buffer.getvalue().decode('utf8')

View File

@@ -0,0 +1,16 @@
import requests
def telegram_send_message(message, bot_token, chat_id):
print(message)
url = f'https://api.telegram.org/bot{bot_token}/sendMessage'
payload = {'chat_id': chat_id, 'text': message}
response = requests.post(url, data=payload)
if response.status_code == 200:
print(' Message sent successfully!')
else:
print(' Failed to send message:', response.text)