hedgedoc-util.py 11.20 KiB
#!/usr/bin/env python3
import sys
import json
import binascii
import base64
from email.message import EmailMessage
from subprocess import Popen, PIPE
import click
import pymysql
import pymysql.cursors
import configparser
class GlobalState():
def __init__(self, options):
self.config = self._load_config(options.get('config'))
self.config = self._clean_config(self.config)
self.config.update(self._clean_config(options))
try:
self.db = self._get_connection()
except Exception as e:
click.echo("Database connection failed: {}".format(repr(e)))
sys.exit(2)
self._check_schema()
def _get_connection(self):
return pymysql.connect(host=self.config['dbhost'],
user=self.config['dbuser'],
password=self.config['dbpw'],
database=self.config['dbname'],
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
def _check_schema(self):
with self.db.cursor() as cursor:
cursor.execute('SELECT name from SequelizeMeta ORDER BY name ASC')
schema = ','.join([i['name'] for i in cursor.fetchall()])
if schema != self.config['dbschema']:
click.echo("Unsupportet db schema: {}".format(schema))
sys.exit(2)
def _load_config(self, path):
result = {}
try:
with open(path, 'r') as f:
for line in f.readlines():
if line.startswith('#'):
continue
data = line.replace('\n', '').split('=')
if len(data) != 2:
continue
result[data[0]] = data[1]
except:
return {}
return result
def _clean_config(self, d):
return {
k.replace('HEDGEDOCUTIL_','').lower():v
for k, v in d.items()
if v is not None
}
def _default_template_mail():
ctx = click.get_current_context().obj
return ctx.config['pad_mail_template']
def _date_json_handler(obj):
return obj.isoformat() if hasattr(obj, 'isoformat') else obj
def _decode_nested_json(data, fieldnames):
for i in data:
for fieldname in fieldnames:
if fieldname in i:
if i[fieldname] == None:
continue
i[fieldname] = json.loads(i[fieldname])
return data
# See https://github.com/hedgedoc/hedgedoc/blob/2b8aac289a326468ab4eecb442acd59298541399/lib/models/note.js#L165-L181
# Whoever came up with this shit should be hit repeatedly with a blunt object until they expire. At the very least you own me a beer.
def note_id_decode_from_url(input_id):
raw = binascii.hexlify(base64.urlsafe_b64decode(input_id+'==')).decode()
return raw[0:8]+'-'+raw[8:12]+'-'+raw[12:16]+'-'+raw[16:20]+'-'+raw[20:32]
def note_id_encode_to_url(input_id):
return base64.urlsafe_b64encode(binascii.unhexlify(input_id.replace('-', '').encode())).decode().replace('=', '')
def pad_list(db, columns, last_change_older=0, owner=0):
with db.cursor() as cursor:
# this is no sql injection vulnerability because we let click verify the content of "columns" to match a whitelist
cursor.execute(F"SELECT {','.join(columns)} FROM Notes WHERE (%(last_change_older)s = 0 OR DATEDIFF(NOW(), lastchangeAt) > %(last_change_older)s OR (lastchangeAt IS NULL AND DATEDIFF(NOW(), createdAt) > %(last_change_older)s) AND (%(owner)s = '' OR ownerId = %(owner)s)) ORDER BY id", {'last_change_older': last_change_older, 'owner': owner})
return _decode_nested_json(cursor.fetchall(), ['authorship'])
def pad_get(db, id):
with db.cursor() as cursor:
cursor.execute('SELECT * FROM Notes WHERE id=%s', (id))
return _decode_nested_json([cursor.fetchone()], ['authorship'])[0]
def pad_get_content(db, id):
return pad_get(db, id).get('content', '')
def pad_delete(db, id):
pad = pad_get(db, id)
urlid = note_id_encode_to_url(id)
with db:
with db.cursor() as cursor:
cursor.execute('DELETE FROM Revisions WHERE noteId=%s', (id))
cursor.execute('DELETE FROM Notes WHERE id=%s', (id))
cursor.execute('SELECT id,history FROM Users WHERE JSON_SEARCH(history, "one", %s, "", "$[*].id") is not null;', (urlid))
with db.cursor() as usercursor:
for i in cursor:
history = json.loads(i['history'] or '[]')
history = [ j for j in history if not j.get('id') == urlid ]
usercursor.execute('UPDATE Users set history=%s WHERE id=%s;', (json.dumps(history), i['id']))
db.commit()
def pad_mail(db, id, template, formats):
with db.cursor() as cursor:
pad = pad_get(db, id)
converted_formats = {}
if 'dokuwiki' in formats:
converter = Popen(["pandoc", "-o", "/dev/stdout", "-s", "-f", "markdown", "-t", "dokuwiki", "-i", "/dev/stdin"], stdin=PIPE, stdout=PIPE)
out, err = converter.communicate(pad.get('content', '').encode())
converted_formats['dokuwiki'] = out.decode()
if 'markdown' in formats:
converted_formats['markdown'] = pad.get('content', '')
msg = EmailMessage()
msg["From"] = "hedgedoc"
msg["To"] = user_get_mail(db, pad["ownerId"])
msg["Subject"] = "Your pad with title \"{}\"".format(pad.get("title", "<no title>"))
msg.set_content(template.format(content_dokuwiki=converted_formats.get('dokuwiki'), content_markdown=converted_formats.get('markdown')))
if 'dokuwiki' in formats:
msg.add_attachment(converted_formats['dokuwiki'].encode(), maintype="text", subtype="plain", filename="pad-{}.dokuwiki.txt".format(pad.get("title")))
if 'markdown' in formats:
msg.add_attachment(converted_formats['markdown'].encode(), maintype="text", subtype="markdown", filename="pad-{}.md".format(pad.get("title")))
p = Popen(["sendmail", "-t", "-oi"], stdin=PIPE)
p.communicate(msg.as_bytes())
def user_list(db, columns):
with db.cursor() as cursor:
# this is no sql injection vulnerability because we let click verify the content of "columns" to match a whitelist
cursor.execute('SELECT {} FROM Users ORDER BY id'.format(','.join(columns)))
return _decode_nested_json(cursor.fetchall(), ['profile', 'history'])
def user_get(db, id):
with db.cursor() as cursor:
cursor.execute('SELECT * FROM Users WHERE id=%s', (id))
return _decode_nested_json([cursor.fetchone()], ['profile', 'history'])[0]
def user_get_mail(db, id):
user = user_get(db, id)
if user['email']:
return user['email']
if user['profile']:
if user['profile']['email']:
return user['profile']['email']
if user['profile']['mail']:
return user['profile']['mail']
return None
def tsv_escape(value):
if not type(value) is str:
value = json.dumps(value, default=_date_json_handler)
return value.replace('\\', '\\\\').replace('\t', '\\t').replace('\n', '\\n').replace('\r', '\\r')
def output_object(obj):
outputformat = click.get_current_context().obj.config['output']
if outputformat == "text":
click.echo(obj)
elif outputformat == "json":
click.echo(json.dumps(obj, default=_date_json_handler))
elif outputformat in ("tsv", "tsv-noheader"):
if not obj:
return
if type(obj) is dict:
obj = [obj]
if not type(obj) is list:
click.echo(obj)
header = obj[0].keys()
if not outputformat == "tsv-noheader":
click.echo('\t'.join(header))
for i in obj:
click.echo('\t'.join([ tsv_escape(i.get(j,'')) for j in header ]))
@click.group()
@click.option('-o', '--output', type=click.Choice(['text', 'json', 'tsv', 'tsv-noheader']), default='text', help='Select output format', show_default=True, show_envvar=True)
@click.option('--config', default='/usr/local/etc/hedgedoc-util/hedgedoc-util.cfg', type=click.Path(), help='Config to load db and template default settings from', show_envvar=True, show_default=True)
@click.option('--dbuser', help='User name used for the db connection', show_envvar=True)
@click.option('--dbpw', help='Password used for the db connection', show_envvar=True)
@click.option('--dbname', help='Database used', show_envvar=True)
@click.option('--dbhost', help='Host the db is running on', show_envvar=True)
@click.option('--dbschema', help='Schema string to verify the db schema against', show_envvar=True)
@click.pass_context
def cli(ctx, **kwargs):
ctx.obj = GlobalState(kwargs)
@cli.command(name="test-connect", help="Checks wether the connection to the db works and if we support the used schema")
@click.pass_obj
def _test_connect(obj):
if obj.db and obj.config:
click.echo("OK")
sys.exit(0)
else:
click.echo("Connection failed")
sys.exit(1)
@cli.group(name="pad", help="Actions regarding pads")
def cli_pad():
pass
@cli_pad.command(name="list", help="List all pads")
@click.option('-c', '--columns', default=['id'], type=click.Choice(['id', 'title', 'content', 'ownerId', 'createdAt', 'updatedAt', 'shortid', 'permission', 'viewcount', 'lastchangeuserId', 'lastchangeAt', 'alias', 'deletedAt', 'authorship']), help="Select what data to display. Can be passed multiple times.", multiple=True, show_default=True, show_envvar=True)
@click.option('--last-change-older', type=click.INT, default=0, help='Only list those pads which are older than this value. In days.', show_envvar=True)
@click.option('--owner', type=click.STRING, default='', help='Only list pads with this owner, pass the user id', show_envvar=True)
@click.pass_obj
def _pad_list(obj, columns, last_change_older, owner):
output_object(pad_list(obj.db, columns, last_change_older=last_change_older, owner=owner))
@cli_pad.command(name="get", help="Get all data of one pad by its id")
@click.argument('id')
@click.pass_obj
def _pad_get(obj, id):
output_object(pad_get(obj.db, id))
@cli_pad.command(name="delete", help="Deletes a pad")
@click.argument('id')
@click.pass_obj
def _pad_delete(obj, id):
pad_delete(obj.db, id)
@cli_pad.command(name="get-content", help="Get the content of one pad by its id")
@click.argument('id')
@click.pass_obj
def _pad_get_content(obj, id):
output_object(pad_get_content(obj.db, id))
@cli_pad.command(name="mail", help="Send a pad to its creator via mail")
@click.option('--template', type=click.File(), default=_default_template_mail, help='The template file to use', show_default=True, show_envvar=True)
@click.option('--convert', default=['markdown'], type=click.Choice(['markdown', 'dokuwiki']), help="Add the pad as attachment to the mail. Can be passed multiple times.", multiple=True, show_default=True, show_envvar=True)
@click.argument('id')
@click.pass_obj
def _pad_mail(obj, id, template, convert):
pad_mail(obj.db, id, template.read(), formats=convert)
@cli_pad.command(name="urlid-to-id", help="Decode an pad id from urls to a database id")
@click.argument('id')
def _urlid_to_id(id):
output_object(note_id_decode_from_url(id))
@cli_pad.command(name="id-to-urlid", help="Encode an pad id to a from used in urls")
@click.argument('id')
def _id_to_urlid(id):
output_object(note_id_encode_to_url(id))
@cli.group(name="user", help="Actions regarding users")
def cli_user():
pass
@cli_user.command(name="list", help="List all user")
@click.option('-c', '--columns', default=['id'], type=click.Choice(['id', 'profileid', 'profile', 'history', 'createdAt', 'updatedAt', 'accessToken', 'refreshToken', 'email', 'password', 'deleteToken']), help="Select what data to display. Can be passed multiple times.", multiple=True, show_default=True, show_envvar=True)
@click.pass_obj
def _user_list(obj, columns):
output_object(user_list(obj.db, columns))
@cli_user.command(name="get", help="Get all data of a user by its id")
@click.argument('id')
@click.pass_obj
def _user_get(obj, id):
output_object(user_get(obj.db, id))
@cli_user.command(name="get-mail", help="Find the mail adress of a user, will search in multiple fields")
@click.argument('id')
@click.pass_obj
def _user_get_mail(obj, id):
output_object(user_get_mail(obj.db, id))
if __name__ == '__main__':
cli(auto_envvar_prefix='HEDGEDOCUTIL')