Updated test_create_vm_task for monthly subscriptions

This commit is contained in:
PCoder 2017-08-24 11:50:43 +05:30
parent 0256b0bf03
commit 3111255673

View file

@ -50,7 +50,12 @@ class CeleryTaskTestCase(TestCase):
call_command('fetchvmtemplates')
def test_create_vm_task(self):
"""Tests the create vm task."""
"""Tests the create vm task for monthly subscription
This test is supposed to validate the proper execution
of celery create_vm_task on production, as we have no
other way to do this.
"""
# We create a VM from the first template available to DCL
vm_template = VMTemplate.objects.all().first()
@ -60,13 +65,16 @@ class CeleryTaskTestCase(TestCase):
specs = {
'cpu': 1,
'memory': 2,
'disk_size': 10,
'price': 15,
'disk_size': 10
}
stripe_customer = StripeCustomer.get_or_create(
email=self.customer_email,
token=self.token)
card_details = self.stripe_utils.get_card_details(
stripe_customer.stripe_id,
self.token)
card_details_dict = card_details.get('response_object')
billing_address = BillingAddress(
cardholder_name=self.customer_name,
postal_code='1232',
@ -83,28 +91,44 @@ class CeleryTaskTestCase(TestCase):
billing_address_id = billing_address.id
vm_template_id = template_data.get('id', 1)
final_price = specs.get('price')
# Make stripe charge to a customer
stripe_utils = StripeUtils()
charge_response = stripe_utils.make_charge(
amount=final_price,
customer=stripe_customer.stripe_id)
cpu = specs.get('cpu')
memory = specs.get('memory')
disk_size = specs.get('disk_size')
amount_to_be_charged = (cpu * 5) + (memory * 2) + (disk_size * 0.6)
plan_name = "{cpu} Cores, {memory} GB RAM, {disk_size} GB SSD".format(
cpu=cpu,
memory=memory,
disk_size=disk_size)
# Check if the payment was approved
if not charge_response.get(
'response_object'):
msg = charge_response.get('error')
raise Exception("make_charge failed: {}".format(msg))
stripe_plan_id = StripeUtils.get_stripe_plan_id(cpu=cpu,
ram=memory,
ssd=disk_size,
version=1,
app='dcl')
stripe_plan = self.stripe_utils.get_or_create_stripe_plan(
amount=amount_to_be_charged,
name=plan_name,
stripe_plan_id=stripe_plan_id)
subscription_result = self.stripe_utils.subscribe_customer_to_plan(
stripe_customer.stripe_id,
[{"plan": stripe_plan.get(
'response_object').stripe_plan_id}])
stripe_subscription_obj = subscription_result.get('response_object')
# Check if the subscription was approved and is active
if stripe_subscription_obj is None or \
stripe_subscription_obj.status != 'active':
msg = subscription_result.get('error')
raise Exception("Creating subscription failed: {}".format(msg))
charge = charge_response.get('response_object')
async_task = create_vm_task.delay(vm_template_id, self.user,
specs,
template_data,
stripe_customer.id,
billing_address_data,
billing_address_id,
charge)
stripe_subscription_obj,
card_details_dict)
new_vm_id = 0
res = None
for i in range(0, 10):