forked from fnux/rt2zammad
326 lines
11 KiB
Python
Executable file
326 lines
11 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
import base64
|
|
import json
|
|
import os
|
|
import pickle
|
|
import sys
|
|
import html2text
|
|
import traceback
|
|
import time
|
|
|
|
from zammad_py import ZammadAPI
|
|
from zammad_py.api import Resource, TagList, TicketArticle
|
|
|
|
TEMPLATE = """{
|
|
"zammad_url": "",
|
|
"zammad_user": "",
|
|
"zammad_password": "",
|
|
"rt_url": "",
|
|
"rt_user": "",
|
|
"rt_pass": "",
|
|
"rt_start": 1,
|
|
"rt_end": 1000,
|
|
}
|
|
"""
|
|
|
|
COMMENT_TEMPLATE = """
|
|
Ticket imported from Request Tracker (RT-#{numerical_id})
|
|
|
|
URL: https://support.ungleich.ch/Ticket/Display.html?id={numerical_id}
|
|
Created: {Created}
|
|
Resolved: {Resolved}
|
|
"""
|
|
|
|
STATUSMAP = {"new": 1, "open": 2, "resolved": 4, "rejected": 4, "deleted": 4}
|
|
USERMAP = {}
|
|
|
|
ZAMMAD_SESSIONS = {}
|
|
|
|
### helpers ###
|
|
|
|
def get_zammad_session(impersonated_user=None):
|
|
if impersonated_user in ZAMMAD_SESSIONS:
|
|
return ZAMMAD_SESSIONS[impersonated_user]
|
|
else:
|
|
kwargs = {}
|
|
if impersonated_user:
|
|
kwargs["on_behalf_of"] = impersonated_user
|
|
session = ZammadAPI(
|
|
url=config["zammad_host"],
|
|
username=config["zammad_user"],
|
|
password=config["zammad_password"],
|
|
**kwargs,
|
|
)
|
|
ZAMMAD_SESSIONS[impersonated_user or config["zammad_user"]] = session
|
|
|
|
return session
|
|
|
|
def maybe_create_zammad_user(userdata, zammad_session, attr="login", default=None):
|
|
# Map disabled users (in RT) to mock user in Zammad.
|
|
if type(userdata) is str and userdata.lower() in USERMAP:
|
|
email = userdata
|
|
elif type(userdata) is str or "EmailAddress" not in userdata:
|
|
userdata = {
|
|
'EmailAddress': 'technik@ungleich.ch',
|
|
'RealName': 'Disabled RT'
|
|
}
|
|
email = userdata["EmailAddress"]
|
|
elif "EmailAddress" in userdata:
|
|
email = userdata["EmailAddress"]
|
|
else:
|
|
raise ValueError("Invalid userdata")
|
|
|
|
# We manually filter out invalid addresses.
|
|
if email == "*@*.com":
|
|
userdata = {
|
|
'EmailAddress': 'technik@ungleich.ch',
|
|
'RealName': 'Disabled RT'
|
|
}
|
|
email = userdata["EmailAddress"]
|
|
|
|
|
|
lowercase_email = email.lower()
|
|
|
|
if lowercase_email not in USERMAP:
|
|
kwargs = {"email": email}
|
|
kwargs.update(config["userdata"])
|
|
if "RealName" in userdata:
|
|
realname = userdata["RealName"]
|
|
if ", " in realname.strip():
|
|
last, first = realname.split(", ", 1)
|
|
elif " " in realname.strip():
|
|
first, last = realname.split(None, 1)
|
|
else:
|
|
last = realname
|
|
first = ""
|
|
kwargs["lastname"] = last
|
|
kwargs["firstname"] = first
|
|
try:
|
|
user = zammad_session.user.create(kwargs)
|
|
USERMAP[lowercase_email] = user
|
|
except:
|
|
# The use probably exist already...
|
|
result = list(zammad.user.search(lowercase_email))
|
|
if len(result) >= 1:
|
|
user = next(u for u in result if u['email'] == lowercase_email)
|
|
USERMAP[lowercase_email] = user
|
|
else:
|
|
print(f"Could not create/fetch user {lowercase_email}")
|
|
|
|
if default is None:
|
|
return USERMAP[lowercase_email][attr]
|
|
|
|
return USERMAP[lowercase_email].get(attr, default)
|
|
|
|
# def ensure_user_is_zammad_agent(user, zammad_session):
|
|
# print(f"Promoting user {user} to Agent")
|
|
# id = maybe_create_zammad_user(user, zammad_session, "id")
|
|
# roles = maybe_create_zammad_user(user, zammad_session, "roles")
|
|
# if "Agent" not in roles:
|
|
# zammad_session.user.update(
|
|
# id, {"roles": roles.append("Agent")}
|
|
# )
|
|
|
|
def create_zammad_ticket(id, zammad, h2t, retries=3):
|
|
try:
|
|
with open(f"tickets/{id}", "rb") as handle:
|
|
rt_ticket = pickle.load(handle)
|
|
|
|
label = "RT-{}".format(rt_ticket["ticket"]["original_id"])
|
|
creator = rt_ticket["ticket"]["Creator"]
|
|
with open(f"users/{creator}", "rb") as handle:
|
|
rt_creator = pickle.load(handle)
|
|
|
|
# The RT user object doesn't always contain everything (e.g. disabled users).
|
|
if "EmailAddress" in rt_creator:
|
|
creator = rt_creator["EmailAddress"]
|
|
print(f"Importing {label} ({creator})")
|
|
|
|
zammad_creator = maybe_create_zammad_user(rt_creator, zammad)
|
|
|
|
# Make sure that we use what we created in Zammad, independently of what we
|
|
# had in RT.
|
|
creator = zammad_creator
|
|
|
|
zammad_ticket_template = {
|
|
"title": "[RT-{}] {}".format(id, rt_ticket["ticket"]["Subject"]),
|
|
"group": "Users",
|
|
"customer": creator,
|
|
"note": "RT-import:{}".format(rt_ticket["ticket"]["original_id"]),
|
|
"article": {
|
|
"subject": rt_ticket["ticket"]["Subject"],
|
|
},
|
|
}
|
|
|
|
# Ticket creation.
|
|
merged = False
|
|
if rt_ticket["ticket"]["original_id"] != rt_ticket["ticket"]["numerical_id"]:
|
|
merged = True
|
|
zammad_ticket_template["state_id"] = STATUSMAP["resolved"]
|
|
zammad_ticket_template["article"]["body"] = "RT ticket merged into {}".format(
|
|
rt_ticket["ticket"]["numerical_id"]
|
|
)
|
|
zammad_ticket = zammad.ticket.create(zammad_ticket_template)
|
|
else:
|
|
zammad_ticket_template["state_id"] = STATUSMAP[rt_ticket["ticket"]["Status"]]
|
|
body = rt_ticket["history"][0]["Content"] or 'RT Import: empty comment.'
|
|
zammad_ticket_template["article"]["body"] = body
|
|
zammad_ticket = zammad.ticket.create(zammad_ticket_template)
|
|
|
|
print(f"Created Zammad ticket {zammad_ticket['id']} for {label}")
|
|
if rt_ticket["ticket"]["Owner"] and rt_ticket["ticket"]["Owner"] != "Nobody":
|
|
zammad_owner_id = maybe_create_zammad_user(rt_ticket["ticket"]["Owner"], zammad, "id")
|
|
zammad.ticket.update(
|
|
zammad_ticket["id"], {"owner_id": zammad_owner_id}
|
|
)
|
|
|
|
# Ignore comments for merged tickets.
|
|
if merged:
|
|
return zammad_ticket["id"]
|
|
|
|
# Internal note regarding the RT-Zammad import.
|
|
TicketArticle(zammad).create(
|
|
{
|
|
"ticket_id": zammad_ticket["id"],
|
|
"body": COMMENT_TEMPLATE.format(**rt_ticket["ticket"]),
|
|
"internal": True,
|
|
}
|
|
)
|
|
|
|
# Comments/notes within the ticket.
|
|
for entry in rt_ticket["history"]:
|
|
if entry["Type"] not in ("Correspond", "Comment"):
|
|
continue
|
|
|
|
# Attachments.
|
|
files = []
|
|
may_contain_entry_content = None
|
|
for a, title in entry["Attachments"]:
|
|
with open(f"attachments/{id}/{a}", "rb") as handle:
|
|
data = pickle.load(handle)
|
|
|
|
if data["Filename"] == "signature.asc":
|
|
continue
|
|
|
|
file_template = {
|
|
"filename": data["Filename"],
|
|
"data": base64.b64encode(data["Content"]).decode("utf-8"),
|
|
"mime-type": data["ContentType"],
|
|
}
|
|
|
|
if data["Filename"] == '':
|
|
if may_contain_entry_content is None:
|
|
may_contain_entry_content = file_template
|
|
else:
|
|
files.append(file_template)
|
|
|
|
# Comment/note.
|
|
entry_creator = entry["Creator"]
|
|
with open(f"users/{entry_creator}", "rb") as handle:
|
|
rt_entry_creator = pickle.load(handle)
|
|
|
|
zammad_entry_creator = maybe_create_zammad_user(rt_entry_creator, zammad)
|
|
entry_content = entry["Content"]
|
|
if entry_content == '' and may_contain_entry_content:
|
|
file = may_contain_entry_content
|
|
entry_content = base64.b64decode(file["data"]).decode("utf-8")
|
|
if file["mime-type"]:
|
|
entry_content = h2t.handle(entry_content)
|
|
if entry_content.strip() == '':
|
|
entry_content = "RT: Empty content."
|
|
|
|
zammad_entry_template = {
|
|
"ticket_id": zammad_ticket["id"],
|
|
"body": entry_content,
|
|
"internal": entry["Type"] == "Comment",
|
|
"attachments": files,
|
|
}
|
|
entry_creator_id = maybe_create_zammad_user(zammad_entry_creator, zammad, "id")
|
|
|
|
# We temporarly change the ticket's creator to create a new entry
|
|
# without giving its author 'Agent' privileges.
|
|
restore_creator_to = None
|
|
if entry_creator_id != zammad_ticket["customer_id"]:
|
|
zammad.ticket.update(zammad_ticket["id"], {"customer_id": entry_creator_id})
|
|
restore_creator_to = zammad_ticket["customer_id"]
|
|
|
|
zammad_ticket_id = zammad_ticket["id"]
|
|
print(f"-> Adding a new entry/comment to ticket {zammad_ticket_id} ({zammad_entry_creator})...")
|
|
TicketArticle(get_zammad_session(zammad_entry_creator)).create(zammad_entry_template)
|
|
|
|
if restore_creator_to != None:
|
|
zammad.ticket.update(zammad_ticket["id"], {"customer_id": restore_creator_to})
|
|
|
|
# Returns the new Zammad ticket ID.
|
|
return zammad_ticket["id"]
|
|
|
|
except KeyboardInterrupt:
|
|
print("Received keyboard interrupt. Exiting.")
|
|
sys.exit()
|
|
except:
|
|
print(f"Failed to import RT-{id} .. ({retries} retries left)")
|
|
print("Sleeping 5 seconds to give Zammad some air...")
|
|
time.sleep(5)
|
|
|
|
if retries > 0:
|
|
create_zammad_ticket(id, zammad, h2t, retries - 1)
|
|
else:
|
|
traceback.print_exc()
|
|
raise RuntimeError
|
|
|
|
### main logic ###
|
|
|
|
if not os.path.exists("rt2zammad.json"):
|
|
print("Missing rt2zammad.json!")
|
|
print("Create one based on following template:")
|
|
print(TEMPLATE)
|
|
sys.exit(1)
|
|
|
|
with open("rt2zammad.json") as handle:
|
|
config = json.load(handle)
|
|
|
|
#import_start = config["rt_start"]
|
|
#import_end = config["rt_end"]
|
|
|
|
if len(sys.argv) != 3:
|
|
print(len(sys.argv))
|
|
print("It needs ticket range to import ex. start, end")
|
|
sys.exit()
|
|
else:
|
|
import_start = int(sys.argv[1])
|
|
import_end = int(sys.argv[2])
|
|
print(f"start : {import_start} end : {import_end}")
|
|
|
|
h2t = html2text.HTML2Text()
|
|
zammad = get_zammad_session()
|
|
|
|
os.makedirs("users", exist_ok=True)
|
|
os.makedirs("tickets", exist_ok=True)
|
|
os.makedirs("attachments", exist_ok=True)
|
|
os.makedirs("failed", exist_ok=True)
|
|
os.makedirs("processed", exist_ok=True)
|
|
|
|
ticket_ids = os.listdir("tickets/")
|
|
print(f"Found {len(ticket_ids)} tickets on filesystem.")
|
|
ticket_ids = list(map(int, ticket_ids))
|
|
ticket_ids.sort()
|
|
ticket_ids = list(map(int, ticket_ids[import_start-1:import_end]))
|
|
|
|
for id in ticket_ids:
|
|
try:
|
|
matching_zammad_tickets= zammad.ticket.search(f"title: \"\[RT-{id}\]*\"")
|
|
if len(matching_zammad_tickets) >= 1:
|
|
print(f"Found duplicates: {id}")
|
|
continue
|
|
zammad_ticket_id = create_zammad_ticket(id, zammad, h2t, 5)
|
|
dumpfile = f"processed/{id}"
|
|
with open(dumpfile, "w") as handle:
|
|
handle.write(f"{zammad_ticket_id}")
|
|
except SystemExit:
|
|
sys.exit()
|
|
except:
|
|
print(f"Failed to import RT#{id}")
|
|
dumpfile = f"failed/{id}"
|
|
with open(dumpfile, "wb") as handle:
|
|
traceback.print_exc(file=handle)
|