Skip to content
Snippets Groups Projects
hedgedoc-util.py 11.1 KiB
Newer Older
nd's avatar
nd committed
#!/usr/bin/env python3
import sys
import json
import binascii
import base64
from email.message import EmailMessage
from subprocess import Popen, PIPE

import click
import pymysql
import pymysql.cursors
import configparser

class GlobalState():
	def __init__(self, options):
		self.config = self._load_config(options.get('config'))
nd's avatar
nd committed
		self.config = self._clean_config(self.config)
		self.config.update(self._clean_config(options))
		try:
			self.db = self._get_connection()
		except Exception as e:
			click.echo("Database connection failed: {}".format(repr(e)))
			sys.exit(2)
nd's avatar
nd committed
		self._check_schema()

	def _get_connection(self):
		return pymysql.connect(host=self.config['dbhost'],
			user=self.config['dbuser'],
			password=self.config['dbpw'],
			database=self.config['dbname'],
nd's avatar
nd committed
			charset='utf8mb4',
			cursorclass=pymysql.cursors.DictCursor)

	def _check_schema(self):
		with self.db.cursor() as cursor:
			cursor.execute('SELECT name from SequelizeMeta ORDER BY name ASC')
			schema = ','.join([i['name'] for i in cursor.fetchall()])
		if schema != self.config['dbschema']:
nd's avatar
nd committed
			click.echo("Unsupportet db schema: {}".format(schema))
			sys.exit(2)

	def _load_config(self, path):
		result = {}
		try:
			with open(path, 'r') as f:
				for line in f.readlines():
					if line.startswith('#'):
						continue
					data = line.replace('\n', '').split('=')
					if len(data) != 2:
						continue
					result[data[0]] = data[1]
		except:
			return {}
		return result
	def _clean_config(self, d):
		return {
				k.replace('HEDGEDOCUTIL_','').lower():v
				for k, v in d.items()
				if v is not None
			}
nd's avatar
nd committed

def _default_template_mail():
	ctx = click.get_current_context().obj
	return ctx.config['pad_mail_template']
nd's avatar
nd committed

def _date_json_handler(obj):
	return obj.isoformat() if hasattr(obj, 'isoformat') else obj

def _decode_nested_json(data, fieldnames):
	for i in data:
		for fieldname in fieldnames:
			if fieldname in i:
				if i[fieldname] == None:
					continue
				i[fieldname] = json.loads(i[fieldname])
	return data

# See https://github.com/hedgedoc/hedgedoc/blob/2b8aac289a326468ab4eecb442acd59298541399/lib/models/note.js#L165-L181
nd's avatar
nd committed
# Whoever came up with this shit should be hit repeatedly with a blunt object until they expire. At the very least you own me a beer.
nd's avatar
nd committed
def note_id_decode_from_url(input_id):
	raw = binascii.hexlify(base64.urlsafe_b64decode(input_id+'==')).decode()
	return raw[0:8]+'-'+raw[8:12]+'-'+raw[12:16]+'-'+raw[16:20]+'-'+raw[20:32]
def note_id_encode_to_url(input_id):
	return base64.urlsafe_b64encode(binascii.unhexlify(input_id.replace('-', '').encode())).decode().replace('=', '')

def pad_list(db, columns, last_change_older=0, owner=0):
	with db.cursor() as cursor:
		# this is no sql injection vulnerability because we let click verify the content of "columns" to match a whitelist
		cursor.execute('SELECT {} FROM Notes WHERE (%s = 0 OR DATEDIFF(NOW(),lastchangeAt) > %s) AND (%s = "" or ownerId = %s) ORDER BY id'.format(','.join(columns)), (last_change_older, last_change_older, owner, owner))
		return _decode_nested_json(cursor.fetchall(), ['authorship'])

def pad_get(db, id):
	with db.cursor() as cursor:
		cursor.execute('SELECT * FROM Notes WHERE id=%s', (id))
		return _decode_nested_json([cursor.fetchone()], ['authorship'])[0]

def pad_get_content(db, id):
	return pad_get(db, id).get('content', '')

def pad_delete(db, id):
	pad = pad_get(db, id)
	urlid = note_id_encode_to_url(id)
	with db:
		with db.cursor() as cursor:
			cursor.execute('DELETE FROM Revisions WHERE noteId=%s', (id))
			cursor.execute('DELETE FROM Notes WHERE id=%s', (id))
			cursor.execute('SELECT id,history FROM Users WHERE JSON_SEARCH(history, "one", %s, "", "$[*].id") is not null;', (urlid))
			with db.cursor() as usercursor:
				for i in cursor:
					history = json.loads(i['history'] or '[]')
					history = [ j for j in history if not j.get('id') == urlid ]
					usercursor.execute('UPDATE Users set history=%s WHERE id=%s;', (json.dumps(history), i['id']))
		db.commit()

def pad_mail(db, id, template, formats):
	with db.cursor() as cursor:
		pad = pad_get(db, id)

		converted_formats = {}
		if 'dokuwiki' in formats:
			converter = Popen(["pandoc", "-o", "/dev/stdout", "-s", "-f", "markdown", "-t", "dokuwiki", "-i", "/dev/stdin"], stdin=PIPE, stdout=PIPE)
			out, err = converter.communicate(pad.get('content', '').encode())
			converted_formats['dokuwiki'] = out.decode()
		if 'markdown' in formats:
			converted_formats['markdown'] = pad.get('content', '')

		msg = EmailMessage()
		msg["From"] = "hedgedoc"
		msg["To"] = user_get_mail(db, pad["ownerId"])
		msg["Subject"] = "Your pad with title \"{}\"".format(pad.get("title", "<no title>"))
		msg.set_content(template.format(content_dokuwiki=converted_formats.get('dokuwiki'), content_markdown=converted_formats.get('markdown')))
		if 'dokuwiki' in formats:
			msg.add_attachment(converted_formats['dokuwiki'].encode(), maintype="text", subtype="plain", filename="pad-{}.dokuwiki.txt".format(pad.get("title")))
		if 'markdown' in formats:
			msg.add_attachment(converted_formats['markdown'].encode(), maintype="text", subtype="markdown", filename="pad-{}.md".format(pad.get("title")))

		p = Popen(["sendmail", "-t", "-oi"], stdin=PIPE)
nd's avatar
nd committed
		p.communicate(msg.as_bytes())

def user_list(db, columns):
	with db.cursor() as cursor:
		# this is no sql injection vulnerability because we let click verify the content of "columns" to match a whitelist
		cursor.execute('SELECT {} FROM Users ORDER BY id'.format(','.join(columns)))
		return _decode_nested_json(cursor.fetchall(), ['profile', 'history'])

def user_get(db, id):
	with db.cursor() as cursor:
		cursor.execute('SELECT * FROM Users WHERE id=%s', (id))
		return _decode_nested_json([cursor.fetchone()], ['profile', 'history'])[0]

def user_get_mail(db, id):
	user = user_get(db, id)
	if user['email']:
		return user['email']
	if user['profile']:
		if user['profile']['email']:
			return user['profile']['email']
		if user['profile']['mail']:
			return user['profile']['mail']
	return None

def tsv_escape(value):
	if not type(value) is str:
		value = json.dumps(value, default=_date_json_handler)
	return value.replace('\\', '\\\\').replace('\t', '\\t').replace('\n', '\\n').replace('\r', '\\r')

def output_object(obj):
	outputformat = click.get_current_context().obj.config['output']
	if outputformat == "text":
nd's avatar
nd committed
		click.echo(obj)
	elif outputformat == "json":
nd's avatar
nd committed
		click.echo(json.dumps(obj, default=_date_json_handler))
	elif outputformat in ("tsv", "tsv-noheader"):
		if not obj:
			return
nd's avatar
nd committed
		if type(obj) is dict:
			obj = [obj]
		if not type(obj) is list:
			click.echo(obj)
		header = obj[0].keys()
		if not outputformat == "tsv-noheader":
nd's avatar
nd committed
			click.echo('\t'.join(header))
		for i in obj:
			click.echo('\t'.join([ tsv_escape(i.get(j,'')) for j in header ]))


@click.group()
@click.option('-o', '--output', type=click.Choice(['text', 'json', 'tsv', 'tsv-noheader']), default='text', help='Select output format', show_default=True, show_envvar=True)
@click.option('--config', default='/usr/local/etc/hedgedoc-util/hedgedoc-util.cfg', type=click.Path(), help='Config to load db and template default settings from', show_envvar=True, show_default=True)
@click.option('--dbuser', help='User name used for the db connection', show_envvar=True)
@click.option('--dbpw', help='Password used for the db connection', show_envvar=True)
@click.option('--dbname', help='Database used', show_envvar=True)
@click.option('--dbhost', help='Host the db is running on', show_envvar=True)
@click.option('--dbschema', help='Schema string to verify the db schema against', show_envvar=True)
nd's avatar
nd committed
@click.pass_context
def cli(ctx, **kwargs):
	ctx.obj = GlobalState(kwargs)
nd's avatar
nd committed

@cli.command(name="test-connect", help="Checks wether the connection to the db works and if we support the used schema")
@click.pass_obj
def _test_connect(obj):
	if obj.db and obj.config:
		click.echo("OK")
		sys.exit(0)
	else:
		click.echo("Connection failed")
		sys.exit(1)

@cli.group(name="pad", help="Actions regarding pads")
def cli_pad():
	pass

@cli_pad.command(name="list", help="List all pads")
@click.option('-c', '--columns', default=['id'], type=click.Choice(['id', 'title', 'content', 'ownerId', 'createdAt', 'updatedAt', 'shortid', 'permission', 'viewcount', 'lastchangeuserId', 'lastchangeAt', 'alias', 'deletedAt', 'authorship']), help="Select what data to display. Can be passed multiple times.", multiple=True, show_default=True, show_envvar=True)
@click.option('--last-change-older', type=click.INT, default=0, help='Only list those pads which are older than this value. In days.', show_envvar=True)
@click.option('--owner', type=click.STRING, default='', help='Only list pads with this owner, pass the user id', show_envvar=True)
nd's avatar
nd committed
@click.pass_obj
def _pad_list(obj, columns, last_change_older, owner):
	output_object(pad_list(obj.db, columns, last_change_older=last_change_older, owner=owner))

@cli_pad.command(name="get", help="Get all data of one pad by its id")
@click.argument('id')
@click.pass_obj
def _pad_get(obj, id):
	output_object(pad_get(obj.db, id))

@cli_pad.command(name="delete", help="Deletes a pad")
@click.argument('id')
@click.pass_obj
def _pad_delete(obj, id):
	pad_delete(obj.db, id)

@cli_pad.command(name="get-content", help="Get the content of one pad by its id")
@click.argument('id')
@click.pass_obj
def _pad_get_content(obj, id):
	output_object(pad_get_content(obj.db, id))
nd's avatar
nd committed

@cli_pad.command(name="mail", help="Send a pad to its creator via mail")
@click.option('--template', type=click.File(), default=_default_template_mail, help='The template file to use', show_default=True, show_envvar=True)
@click.option('--convert', default=['markdown'], type=click.Choice(['markdown', 'dokuwiki']), help="Add the pad as attachment to the mail. Can be passed multiple times.", multiple=True, show_default=True, show_envvar=True)
nd's avatar
nd committed
@click.argument('id')
@click.pass_obj
def _pad_mail(obj, id, template, convert):
	pad_mail(obj.db, id, template.read(), formats=convert)

@cli_pad.command(name="urlid-to-id", help="Decode an pad id from urls to a database id")
@click.argument('id')
def _urlid_to_id(id):
	output_object(note_id_decode_from_url(id))

@cli_pad.command(name="id-to-urlid", help="Encode an pad id to a from used in urls")
@click.argument('id')
def _id_to_urlid(id):
	output_object(note_id_encode_to_url(id))

@cli.group(name="user", help="Actions regarding users")
def cli_user():
	pass

@cli_user.command(name="list", help="List all user")
@click.option('-c', '--columns', default=['id'], type=click.Choice(['id', 'profileid', 'profile', 'history', 'createdAt', 'updatedAt', 'accessToken', 'refreshToken', 'email', 'password', 'deleteToken']), help="Select what data to display. Can be passed multiple times.", multiple=True, show_default=True, show_envvar=True)
nd's avatar
nd committed
@click.pass_obj
def _user_list(obj, columns):
	output_object(user_list(obj.db, columns))

@cli_user.command(name="get", help="Get all data of a user by its id")
@click.argument('id')
@click.pass_obj
def _user_get(obj, id):
	output_object(user_get(obj.db, id))

@cli_user.command(name="get-mail", help="Find the mail adress of a user, will search in multiple fields")
@click.argument('id')
@click.pass_obj
def _user_get_mail(obj, id):
	output_object(user_get_mail(obj.db, id))

if __name__ == '__main__':
	cli(auto_envvar_prefix='HEDGEDOCUTIL')