161 lines
		
	
	
	
		
			6.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			161 lines
		
	
	
	
		
			6.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# from django.test import TestCase
 | 
						|
 | 
						|
from time import sleep
 | 
						|
 | 
						|
import stripe
 | 
						|
from celery.result import AsyncResult
 | 
						|
from django.conf import settings
 | 
						|
from django.core.management import call_command
 | 
						|
from django.test import TestCase, override_settings
 | 
						|
from model_mommy import mommy
 | 
						|
from unittest import skipIf
 | 
						|
 | 
						|
from datacenterlight.models import VMTemplate
 | 
						|
from datacenterlight.tasks import create_vm_task
 | 
						|
from hosting.models import HostingOrder
 | 
						|
from membership.models import StripeCustomer
 | 
						|
from opennebula_api.serializers import VMTemplateSerializer
 | 
						|
from utils.hosting_utils import get_vm_price
 | 
						|
from utils.models import BillingAddress
 | 
						|
from utils.stripe_utils import StripeUtils
 | 
						|
 | 
						|
 | 
						|
@skipIf(
 | 
						|
        settings.STRIPE_API_PRIVATE_KEY_TEST is None or
 | 
						|
        settings.STRIPE_API_PRIVATE_KEY_TEST is "",
 | 
						|
        """Stripe details unavailable, so skipping CeleryTaskTestCase"""
 | 
						|
    )
 | 
						|
class CeleryTaskTestCase(TestCase):
 | 
						|
    @override_settings(
 | 
						|
        task_eager_propagates=True,
 | 
						|
        task_always_eager=True,
 | 
						|
    )
 | 
						|
    def setUp(self):
 | 
						|
        self.customer_password = 'test_password'
 | 
						|
        self.customer_email = 'celery-createvm-task-test@ungleich.ch'
 | 
						|
        self.customer_name = "Monty Python"
 | 
						|
        self.user = {
 | 
						|
            'email': self.customer_email,
 | 
						|
            'name': self.customer_name
 | 
						|
        }
 | 
						|
        self.customer = mommy.make('membership.CustomUser')
 | 
						|
        self.customer.set_password(self.customer_password)
 | 
						|
        self.customer.email = self.customer_email
 | 
						|
        self.customer.save()
 | 
						|
        self.stripe_utils = StripeUtils()
 | 
						|
        stripe.api_key = settings.STRIPE_API_PRIVATE_KEY_TEST
 | 
						|
        self.token = stripe.Token.create(
 | 
						|
            card={
 | 
						|
                "number": '4111111111111111',
 | 
						|
                "exp_month": 12,
 | 
						|
                "exp_year": 2022,
 | 
						|
                "cvc": '123'
 | 
						|
            },
 | 
						|
        )
 | 
						|
        # Run fetchvmtemplates so that we have the VM templates from
 | 
						|
        # OpenNebula
 | 
						|
        call_command('fetchvmtemplates')
 | 
						|
 | 
						|
    @skipIf(
 | 
						|
        settings.OPENNEBULA_DOMAIN is None or settings.OPENNEBULA_DOMAIN is
 | 
						|
        "test_domain",
 | 
						|
        """OpenNebula details unavailable, so skipping test_create_vm_task"""
 | 
						|
    )
 | 
						|
    def test_create_vm_task(self):
 | 
						|
        """Tests the create vm task for monthly subscription
 | 
						|
 | 
						|
        This test is supposed to validate the proper execution
 | 
						|
        of celery create_vm_task on production, as we have no
 | 
						|
        other way to do this.
 | 
						|
        """
 | 
						|
 | 
						|
        # We create a VM from the first template available to DCL
 | 
						|
        vm_template = VMTemplate.objects.all().first()
 | 
						|
        template_data = VMTemplateSerializer(vm_template).data
 | 
						|
 | 
						|
        # The specs of VM that we want to create
 | 
						|
        specs = {
 | 
						|
            'cpu': 1,
 | 
						|
            'memory': 2,
 | 
						|
            'disk_size': 10,
 | 
						|
            'price': 15
 | 
						|
        }
 | 
						|
 | 
						|
        stripe_customer = StripeCustomer.get_or_create(
 | 
						|
            email=self.customer_email,
 | 
						|
            token=self.token
 | 
						|
        )
 | 
						|
        card_details = self.stripe_utils.get_card_details(
 | 
						|
            stripe_customer.stripe_id
 | 
						|
        )
 | 
						|
        card_details_dict = card_details.get('error')
 | 
						|
        self.assertEquals(card_details_dict, None)
 | 
						|
        billing_address_data = {'cardholder_name': self.customer_name,
 | 
						|
                                'postal_code': '1231',
 | 
						|
                                'country': 'CH',
 | 
						|
                                'token': self.token,
 | 
						|
                                'street_address': 'Monty\'s Street',
 | 
						|
                                'city': 'Hollywood'}
 | 
						|
        vm_template_id = template_data.get('id', 1)
 | 
						|
 | 
						|
        cpu = specs.get('cpu')
 | 
						|
        memory = specs.get('memory')
 | 
						|
        disk_size = specs.get('disk_size')
 | 
						|
        amount_to_be_charged = get_vm_price(cpu=cpu, memory=memory,
 | 
						|
                                            disk_size=disk_size)
 | 
						|
        plan_name = StripeUtils.get_stripe_plan_name(cpu=cpu,
 | 
						|
                                                     memory=memory,
 | 
						|
                                                     disk_size=disk_size,
 | 
						|
                                                     price=amount_to_be_charged)
 | 
						|
        stripe_plan_id = StripeUtils.get_stripe_plan_id(cpu=cpu,
 | 
						|
                                                        ram=memory,
 | 
						|
                                                        ssd=disk_size,
 | 
						|
                                                        version=1,
 | 
						|
                                                        app='dcl')
 | 
						|
        stripe_plan = self.stripe_utils.get_or_create_stripe_plan(
 | 
						|
            amount=amount_to_be_charged,
 | 
						|
            name=plan_name,
 | 
						|
            stripe_plan_id=stripe_plan_id)
 | 
						|
        subscription_result = self.stripe_utils.subscribe_customer_to_plan(
 | 
						|
            stripe_customer.stripe_id,
 | 
						|
            [{"plan": stripe_plan.get(
 | 
						|
                'response_object').stripe_plan_id}])
 | 
						|
        stripe_subscription_obj = subscription_result.get('response_object')
 | 
						|
        # Check if the subscription was approved and is active
 | 
						|
        if stripe_subscription_obj is None \
 | 
						|
                or stripe_subscription_obj.status != 'active':
 | 
						|
            msg = subscription_result.get('error')
 | 
						|
            raise Exception("Creating subscription failed: {}".format(msg))
 | 
						|
 | 
						|
        billing_address = BillingAddress(
 | 
						|
            cardholder_name=billing_address_data['cardholder_name'],
 | 
						|
            street_address=billing_address_data['street_address'],
 | 
						|
            city=billing_address_data['city'],
 | 
						|
            postal_code=billing_address_data['postal_code'],
 | 
						|
            country=billing_address_data['country']
 | 
						|
        )
 | 
						|
        billing_address.save()
 | 
						|
 | 
						|
        order = HostingOrder.create(
 | 
						|
            price=specs['price'],
 | 
						|
            vm_id=0,
 | 
						|
            customer=stripe_customer,
 | 
						|
            billing_address=billing_address
 | 
						|
        )
 | 
						|
 | 
						|
        async_task = create_vm_task.delay(
 | 
						|
            vm_template_id, self.user, specs, template_data, order.id
 | 
						|
        )
 | 
						|
        new_vm_id = 0
 | 
						|
        res = None
 | 
						|
        for i in range(0, 10):
 | 
						|
            sleep(5)
 | 
						|
            res = AsyncResult(async_task.task_id)
 | 
						|
            if res.result is not None and res.result > 0:
 | 
						|
                new_vm_id = res.result
 | 
						|
                break
 | 
						|
 | 
						|
        # We expect a VM to be created within 50 seconds
 | 
						|
        self.assertGreater(new_vm_id, 0,
 | 
						|
                           "VM could not be created. res._get_task_meta() = {}"
 | 
						|
                           .format(res._get_task_meta()))
 |