Compare commits

..

7 commits

4 changed files with 199 additions and 35 deletions

View file

@ -58,19 +58,26 @@ def get_zammad_session(impersonated_user=None):
def maybe_create_zammad_user(userdata, zammad_session, attr="login", default=None):
# Map disabled users (in RT) to mock user in Zammad.
#print("*userdata",userdata) #debug
#print("*USERMAP", USERMAP) #debug
if type(userdata) is str and userdata.lower() in USERMAP:
email = userdata
#print("*1") #debug
elif type(userdata) is str or "EmailAddress" not in userdata:
userdata = {
'EmailAddress': 'technik@ungleich.ch',
'RealName': 'Disabled RT'
}
email = userdata["EmailAddress"]
#print("*2") #debug
elif "EmailAddress" in userdata:
email = userdata["EmailAddress"]
#print("*3") #debuig
else:
raise ValueError("Invalid userdata")
#print("*default",default) #debug
#print("*email",email) #debug
# We manually filter out invalid addresses.
if email == "*@*.com":
userdata = {
@ -81,6 +88,7 @@ def maybe_create_zammad_user(userdata, zammad_session, attr="login", default=Non
lowercase_email = email.lower()
#print("*lowercase_email",lowercase_email) #debug
if lowercase_email not in USERMAP:
kwargs = {"email": email}
@ -96,42 +104,30 @@ def maybe_create_zammad_user(userdata, zammad_session, attr="login", default=Non
first = ""
kwargs["lastname"] = last
kwargs["firstname"] = first
#print("*kwargs",kwargs) #debug
try:
#print("*try") #debug
user = zammad_session.user.create(kwargs)
USERMAP[lowercase_email] = user
#print("*try USERMAP") #debug
except:
# The use probably exist already...
result = list(zammad.user.search(lowercase_email))
#print("*ecept")
result = list(zammad.user.search(f'"{lowercase_email}"'))
#print("*ecept result",result) #debug
if len(result) >= 1:
user = next(u for u in result if u['email'] == lowercase_email)
USERMAP[lowercase_email] = user
#print("*except USERMAP",USERMAP) #debug
else:
print(f"Could not create/fetch user {lowercase_email}")
#print("*lastdefault",default) #debug
if default is None:
return USERMAP[lowercase_email][attr]
return USERMAP[lowercase_email].get(attr, default)
def remove_existing_zammad_tickets_for(rt_id, zammad, retries=3):
try:
matching_zammad_tickets= zammad.ticket.search(f"title: \"[RT-{rt_id}]*\"")
if len(matching_zammad_tickets) >= 1:
print(f"Found existing ticket:")
for zt in matching_zammad_tickets:
print(f"{zt["id"]} {zt["title"]}")
print(f"Deleting Zammad ticket {zt["id"]}")
zammad.ticket.destroy(zt["id"])
except:
print(f"Failed to cleanup duplicates for RT-{rt_id} .. ({retries} retries left)")
if retries > 0:
print("Sleeping 5 seconds to give Zammad some air...")
time.sleep(5)
remove_existing_zammad_tickets_for(rt_id, zammad, retries - 1)
else:
traceback.print_exc()
raise RuntimeError
# def ensure_user_is_zammad_agent(user, zammad_session):
# print(f"Promoting user {user} to Agent")
# id = maybe_create_zammad_user(user, zammad_session, "id")
@ -188,9 +184,14 @@ def create_zammad_ticket(id, zammad, h2t, retries=3):
zammad_ticket = zammad.ticket.create(zammad_ticket_template)
print(f"Created Zammad ticket {zammad_ticket['id']} for {label}")
if rt_ticket["ticket"]["Owner"] and rt_ticket["ticket"]["Owner"] != "Nobody":
zammad_owner_id = maybe_create_zammad_user(rt_ticket["ticket"]["Owner"], zammad, "id")
#print("*rt_ticket",rt_ticket) #debug
get_owner = rt_ticket["ticket"]["Owner"]
with open(f"users/{get_owner}", "rb") as handle:
get_owner_data = pickle.load(handle)
#print("*get_owner_data",get_owner_data) #debug
zammad_owner_id = maybe_create_zammad_user(get_owner_data, zammad, "id")
#print("*zammad_owner_id",zammad_owner_id) #debug
zammad.ticket.update(
zammad_ticket["id"], {"owner_id": zammad_owner_id}
)
@ -240,7 +241,9 @@ def create_zammad_ticket(id, zammad, h2t, retries=3):
with open(f"users/{entry_creator}", "rb") as handle:
rt_entry_creator = pickle.load(handle)
#print("*rt_entry_creator",rt_entry_creator) #debug
zammad_entry_creator = maybe_create_zammad_user(rt_entry_creator, zammad)
#print("*zammad_entry_creator",zammad_entry_creator) # debug
entry_content = entry["Content"]
if entry_content == '' and may_contain_entry_content:
file = may_contain_entry_content
@ -256,20 +259,29 @@ def create_zammad_ticket(id, zammad, h2t, retries=3):
"internal": entry["Type"] == "Comment",
"attachments": files,
}
entry_creator_id = maybe_create_zammad_user(zammad_entry_creator, zammad, "id")
entry_creator_id = maybe_create_zammad_user(rt_entry_creator, zammad, "id")
# We temporarly change the ticket's creator to create a new entry
# without giving its author 'Agent' privileges.
restore_creator_to = None
if entry_creator_id != zammad_ticket["customer_id"]:
#print("*t1_1 restore_creator_to",restore_creator_to) #debug
#print("*t1_2 entry_creator_id",entry_creator_id) #debug
#print("*t1_3 customer_id",zammad_ticket["customer_id"]) #debug
zammad.ticket.update(zammad_ticket["id"], {"customer_id": entry_creator_id})
restore_creator_to = zammad_ticket["customer_id"]
zammad_ticket_id = zammad_ticket["id"]
print(f"-> Adding a new entry/comment to ticket {zammad_ticket_id} ({zammad_entry_creator})...")
#print("*t2_1",restore_creator_to) #debug
#print("*t2_2",entry_creator_id) #debug
#print("*t2_3",zammad_entry_creator) #debug
#print("*t2_4",zammad_entry_template) #debug
TicketArticle(get_zammad_session(zammad_entry_creator)).create(zammad_entry_template)
if restore_creator_to != None:
#print("*t3_1",restore_creator_to) #debug
#print("*t3_2",entry_creator_id) #debug
zammad.ticket.update(zammad_ticket["id"], {"customer_id": restore_creator_to})
# Returns the new Zammad ticket ID.
@ -300,15 +312,24 @@ if not os.path.exists("rt2zammad.json"):
with open("rt2zammad.json") as handle:
config = json.load(handle)
#import_start = config["rt_start"]
#import_end = config["rt_end"]
if len(sys.argv) != 3:
print(len(sys.argv))
print("It needs ticket range to import ex. start, end")
sys.exit()
else:
import_start = int(sys.argv[1])
import_end = int(sys.argv[2])
print(f"start : {import_start} end : {import_end}")
h2t = html2text.HTML2Text()
zammad = get_zammad_session()
source_directory = "tickets/"
if len(sys.argv) >= 2:
source_directory = sys.argv[1]
if not os.path.isdir(source_directory):
print(f"Could not find source directory {source_directory}. Exiting.")
sys.exit(1)
#this_page = zammad.user.all()
#for user in this_page:
# print(user)
os.makedirs("users", exist_ok=True)
os.makedirs("tickets", exist_ok=True)
@ -316,14 +337,20 @@ os.makedirs("attachments", exist_ok=True)
os.makedirs("failed", exist_ok=True)
os.makedirs("processed", exist_ok=True)
ticket_ids = os.listdir(source_directory)
print(f"Found {len(ticket_ids)} tickets on filesystem (source directory: {source_directory})")
ticket_ids = os.listdir("tickets/")
print(f"Found {len(ticket_ids)} tickets on filesystem.")
ticket_ids = list(map(int, ticket_ids))
ticket_ids.sort()
ticket_ids = list(map(int, ticket_ids[import_start-1:import_end]))
for id in ticket_ids:
try:
remove_existing_zammad_tickets_for(id, zammad, 5)
print(id)
matching_zammad_tickets = zammad.ticket.search(f"title: \"\[RT-{id}\]*\"")
print(len(matching_zammad_tickets))
if len(matching_zammad_tickets) >= 1:
print(f"Found duplicates: {id}")
continue
zammad_ticket_id = create_zammad_ticket(id, zammad, h2t, 5)
dumpfile = f"processed/{id}"
with open(dumpfile, "w") as handle:

12
rt2fs
View file

@ -50,7 +50,17 @@ os.makedirs("users", exist_ok=True)
os.makedirs("tickets", exist_ok=True)
os.makedirs("attachments", exist_ok=True)
ticket_ids = range(config["rt_start"], config["rt_end"])
#ticket_ids = range(config["rt_start"], config["rt_end"])
if len(sys.argv) != 3:
print(len(sys.argv))
print("It needs ticket range to import ex. start, end")
sys.exit()
else:
export_start = int(sys.argv[1])
export_end = int(sys.argv[2])
print(f"start : {export_start} end : {export_end}")
ticket_ids = range(export_start, export_end + 1)
def dump(id, retries=3):
print(f"Dumping RT#{id}")

View file

@ -59,11 +59,13 @@ def get_zammad_session(impersonated_user=None):
def remove_duplicates_for(rt_id, zammad, retries=0):
try:
matching_zammad_tickets= zammad.ticket.search(f"title: \"\[RT-{rt_id}\]*\"")
#matching_zammad_tickets= zammad.ticket.search('number:16014')
#print(matching_zammad_tickets[0])
matching_zammad_ids = []
if len(matching_zammad_tickets) >= 2:
print(f"Found duplicates:")
for zt in matching_zammad_tickets:
print(f"{zt["id"]} {zt["title"]}")
#print(f"{zt["rt_id"]} {zt["title"]}")
matching_zammad_ids.append(zt["id"])
if len(matching_zammad_ids) >= 2:
@ -94,6 +96,15 @@ if not os.path.exists("rt2zammad.json"):
with open("rt2zammad.json") as handle:
config = json.load(handle)
if len(sys.argv) != 3:
print(len(sys.argv))
print("It needs ticket range to import ex. start, end")
sys.exit()
else:
import_start = int(sys.argv[1])
import_end = int(sys.argv[2])
print(f"start : {import_start} end : {import_end}")
h2t = html2text.HTML2Text()
zammad = get_zammad_session()
@ -104,6 +115,7 @@ ticket_ids = os.listdir("tickets/")
print(f"Found {len(ticket_ids)} tickets on filesystem.")
ticket_ids = list(map(int, ticket_ids))
ticket_ids.sort()
ticket_ids = list(map(int, ticket_ids[import_start-1:import_end]))
for id in ticket_ids:
print(f"Processing RT-{id}...")

115
zammad-remove-ticket Executable file
View file

@ -0,0 +1,115 @@
#!/usr/bin/env python
import base64
import json
import os
import pickle
import sys
import html2text
import traceback
import time
from zammad_py import ZammadAPI
from zammad_py.api import Resource, TagList, TicketArticle
TEMPLATE = """{
"zammad_url": "",
"zammad_user": "",
"zammad_password": "",
"rt_url": "",
"rt_user": "",
"rt_pass": "",
"rt_start": 1,
"rt_end": 1000,
}
"""
STATUSMAP = {"new": 1, "open": 2, "resolved": 4, "rejected": 4, "deleted": 4}
USERMAP = {}
ZAMMAD_SESSIONS = {}
### helpers ###
def get_zammad_session(impersonated_user=None):
if impersonated_user in ZAMMAD_SESSIONS:
return ZAMMAD_SESSIONS[impersonated_user]
else:
kwargs = {}
if impersonated_user:
kwargs["on_behalf_of"] = impersonated_user
session = ZammadAPI(
url=config["zammad_host"],
username=config["zammad_user"],
password=config["zammad_password"],
**kwargs,
)
ZAMMAD_SESSIONS[impersonated_user or config["zammad_user"]] = session
return session
def remove_tickets_for(rt_id, zammad, retries=0):
try:
matching_zammad_tickets= zammad.ticket.search(f"title: \"\[RT-{rt_id}\]*\"")
#matching_zammad_tickets= zammad.ticket.search('number:16014')
#print(matching_zammad_tickets[0])
matching_zammad_ids = []
if len(matching_zammad_tickets) >= 1:
print(f"Found ticket:")
for zt in matching_zammad_tickets:
#print(f"{zt["rt_id"]} {zt["title"]}")
matching_zammad_ids.append(zt["id"])
if len(matching_zammad_ids) >= 1:
matching_zammad_ids.sort()
#matching_zammad_ids.pop()
for zt_id in matching_zammad_ids:
print(f"Deleting Zammad ticket {zt_id}")
zammad.ticket.destroy(zt_id)
except:
print(f"Failed to process RT-{rt_id} .. ({retries} retries left)")
if retries > 0:
print("Sleeping 5 seconds to give Zammad some air...")
time.sleep(5)
remove_tickets_for(id, retries - 1)
else:
traceback.print_exc()
raise RuntimeError
### main logic ###
if not os.path.exists("rt2zammad.json"):
print("Missing rt2zammad.json!")
print("Create one based on following template:")
print(TEMPLATE)
sys.exit(1)
with open("rt2zammad.json") as handle:
config = json.load(handle)
if len(sys.argv) != 3:
print(len(sys.argv))
print("It needs ticket range to import ex. start, end")
sys.exit()
else:
import_start = int(sys.argv[1])
import_end = int(sys.argv[2])
print(f"start : {import_start} end : {import_end}")
h2t = html2text.HTML2Text()
zammad = get_zammad_session()
os.makedirs("tickets", exist_ok=True)
ticket_ids = os.listdir("tickets/")
print(f"Found {len(ticket_ids)} tickets on filesystem.")
ticket_ids = list(map(int, ticket_ids))
ticket_ids.sort()
ticket_ids = list(map(int, ticket_ids[import_start-1:import_end]))
for id in ticket_ids:
print(f"Processing RT-{id}...")
remove_tickets_for(id, zammad)