import uuid from time import sleep from unittest.mock import patch import stripe from celery.result import AsyncResult from django.conf import settings from django.http.request import HttpRequest from django.test import Client from django.test import TestCase, override_settings from unittest import skipIf from model_mommy import mommy from datacenterlight.models import StripePlan from membership.models import StripeCustomer from utils.stripe_utils import StripeUtils from .tasks import save_ssh_key class BaseTestCase(TestCase): """ Base class to initialize the test cases """ def setUp(self): # Password self.dummy_password = 'test_password' # Users self.customer, self.another_customer = mommy.make( 'membership.CustomUser', _quantity=2) self.customer.set_password(self.dummy_password) self.customer.save() self.another_customer.set_password(self.dummy_password) self.another_customer.save() # Stripe mocked data self.stripe_mocked_customer = self.customer_stripe_mocked_data() # Clients self.customer_client = self.get_client(self.customer) self.another_customer_client = self.get_client(self.another_customer) # Request Object self.request = HttpRequest() self.request.META['SERVER_NAME'] = 'ungleich.com' self.request.META['SERVER_PORT'] = '80' def get_client(self, user): """ Authenticate a user and return the client """ client = Client() client.login(email=user.email, password=self.dummy_password) return client def customer_stripe_mocked_data(self): return { "id": "cus_8R1y9UWaIIjZqr", "object": "customer", "currency": "usd", "default_source": "card_18A9up2eZvKYlo2Cq2RJMGeF", "email": "vmedixtodd+1@gmail.com", "livemode": False, "metadata": { }, "shipping": None, "sources": { "object": "list", "data": [{ "id": "card_18A9up2eZvKYlo2Cq2RJMGeF", "object": "card", "brand": "Visa", "country": "US", "customer": "cus_8R1y9UWaIIjZqr", "cvc_check": "pass", "dynamic_last4": None, "exp_month": 12, "exp_year": 2018, "funding": "credit", "last4": "4242", }] } } def setup_view(self, view, *args, **kwargs): """Mimic as_view() returned callable, but returns view instance. args and kwargs are the same you would pass to ``reverse()`` """ view.request = self.request view.args = args view.kwargs = kwargs view.config = None return view class TestStripeCustomerDescription(TestCase): """ A class to test setting the description field of the stripe customer https://stripe.com/docs/api#metadata """ def setUp(self): self.customer_password = 'test_password' self.customer_email = 'test@ungleich.ch' self.customer_name = "Monty Python" self.customer = mommy.make('membership.CustomUser') self.customer.set_password(self.customer_password) self.customer.email = self.customer_email self.customer.save() self.stripe_utils = StripeUtils() stripe.api_key = settings.STRIPE_API_PRIVATE_KEY_TEST self.token = stripe.Token.create( card={ "number": '4111111111111111', "exp_month": 12, "exp_year": 2022, "cvc": '123' }, ) self.failed_token = stripe.Token.create( card={ "number": '4000000000000341', "exp_month": 12, "exp_year": 2022, "cvc": '123' }, ) def test_creating_stripe_customer(self): stripe_data = self.stripe_utils.create_customer(self.token.id, self.customer.email, self.customer_name) self.assertEqual(stripe_data.get('error'), None) customer_data = stripe_data.get('response_object') self.assertEqual(customer_data.description, self.customer_name) class StripePlanTestCase(TestStripeCustomerDescription): """ A class to test Stripe plans """ def test_get_stripe_plan_id_string(self): plan_id_string = StripeUtils.get_stripe_plan_id(cpu=2, ram=20, ssd=100, version=1, app='dcl') self.assertEqual(plan_id_string, 'dcl-v1-cpu-2-ram-20gb-ssd-100gb') plan_id_string = StripeUtils.get_stripe_plan_id(cpu=2, ram=20, ssd=100, version=1, app='dcl', hdd=200) self.assertEqual(plan_id_string, 'dcl-v1-cpu-2-ram-20gb-ssd-100gb-hdd-200gb') def test_get_or_create_plan(self): stripe_plan = self.stripe_utils.get_or_create_stripe_plan(2000, "test plan 1", stripe_plan_id='test-plan-1') self.assertIsNone(stripe_plan.get('error')) self.assertIsInstance(stripe_plan.get('response_object'), StripePlan) @patch('utils.stripe_utils.logger') def test_create_duplicate_plans_error_handling(self, mock_logger): """ Test details: 1. Create a test plan in Stripe with a particular id 2. Try to recreate the plan with the same id 3. This creates a Stripe error, the code should be able to handle the error :param mock_logger: :return: """ unique_id = str(uuid.uuid4().hex) new_plan_id_str = 'test-plan-{}'.format(unique_id) stripe_plan = self.stripe_utils.get_or_create_stripe_plan(2000, "test plan {}".format( unique_id), stripe_plan_id=new_plan_id_str) self.assertIsInstance(stripe_plan.get('response_object'), StripePlan) self.assertEqual(stripe_plan.get('response_object').stripe_plan_id, new_plan_id_str) # Test creating the same plan again and expect the PLAN_EXISTS_ERROR_MSG # We first delete the local Stripe Plan, so that the code tries to create a new plan in Stripe StripePlan.objects.filter( stripe_plan_id=new_plan_id_str).all().delete() stripe_plan_1 = self.stripe_utils.get_or_create_stripe_plan(2000, "test plan {}".format( unique_id), stripe_plan_id=new_plan_id_str) mock_logger.debug.assert_called_with( self.stripe_utils.PLAN_EXISTS_ERROR_MSG.format(new_plan_id_str)) self.assertIsInstance(stripe_plan_1.get('response_object'), StripePlan) self.assertEqual(stripe_plan_1.get('response_object').stripe_plan_id, new_plan_id_str) # Delete the test stripe plan that we just created delete_result = self.stripe_utils.delete_stripe_plan(new_plan_id_str) self.assertIsInstance(delete_result, dict) self.assertEqual(delete_result.get('response_object'), True) @patch('utils.stripe_utils.logger') def test_delete_unexisting_plan_should_fail(self, mock_logger): plan_id = 'crazy-plan-id-that-does-not-exist' result = self.stripe_utils.delete_stripe_plan(plan_id) self.assertIsInstance(result, dict) self.assertEqual(result.get('response_object'), False) mock_logger.debug.assert_called_with( self.stripe_utils.PLAN_DOES_NOT_EXIST_ERROR_MSG.format(plan_id)) def test_subscribe_customer_to_plan(self): stripe_plan = self.stripe_utils.get_or_create_stripe_plan(2000, "test plan 1", stripe_plan_id='test-plan-1') stripe_customer = StripeCustomer.get_or_create( email=self.customer_email, token=self.token) result = self.stripe_utils.subscribe_customer_to_plan( stripe_customer.stripe_id, [{"plan": stripe_plan.get( 'response_object').stripe_plan_id}]) self.assertIsInstance(result.get('response_object'), stripe.Subscription) self.assertIsNone(result.get('error')) self.assertEqual(result.get('response_object').get('status'), 'active') def test_subscribe_customer_to_plan_failed_payment(self): stripe_plan = self.stripe_utils.get_or_create_stripe_plan(2000, "test plan 1", stripe_plan_id='test-plan-1') stripe_customer = StripeCustomer.get_or_create( email=self.customer_email, token=self.failed_token) result = self.stripe_utils.subscribe_customer_to_plan( stripe_customer.stripe_id, [{"plan": stripe_plan.get( 'response_object').stripe_plan_id}]) self.assertIsNone(result.get('response_object'), None) self.assertIsNotNone(result.get('error')) class SaveSSHKeyTestCase(TestCase): """ A test case to test the celery save_ssh_key task """ @override_settings( task_eager_propagates=True, task_always_eager=True, ) def setUp(self): self.public_key = settings.TEST_MANAGE_SSH_KEY_PUBKEY self.hosts = settings.TEST_MANAGE_SSH_KEY_HOST @skipIf(settings.TEST_MANAGE_SSH_KEY_PUBKEY is None or settings.TEST_MANAGE_SSH_KEY_PUBKEY == "" or settings.TEST_MANAGE_SSH_KEY_HOST is None or settings.TEST_MANAGE_SSH_KEY_HOST is "", """Skipping test_save_ssh_key_add because either host or public key were not specified or were empty""") def test_save_ssh_key_add(self): async_task = save_ssh_key.delay([self.hosts], [{'value': self.public_key, 'state': True}]) save_ssh_key_result = None for i in range(0, 10): sleep(5) res = AsyncResult(async_task.task_id) if type(res.result) is bool: save_ssh_key_result = res.result break self.assertIsNotNone(save_ssh_key, "save_ssh_key_result is None") self.assertTrue(save_ssh_key_result, "save_ssh_key_result is False") @skipIf(settings.TEST_MANAGE_SSH_KEY_PUBKEY is None or settings.TEST_MANAGE_SSH_KEY_PUBKEY == "" or settings.TEST_MANAGE_SSH_KEY_HOST is None or settings.TEST_MANAGE_SSH_KEY_HOST is "", """Skipping test_save_ssh_key_add because either host or public key were not specified or were empty""") def test_save_ssh_key_remove(self): async_task = save_ssh_key.delay([self.hosts], [{'value': self.public_key, 'state': False}]) save_ssh_key_result = None for i in range(0, 10): sleep(5) res = AsyncResult(async_task.task_id) if type(res.result) is bool: save_ssh_key_result = res.result break self.assertIsNotNone(save_ssh_key, "save_ssh_key_result is None") self.assertTrue(save_ssh_key_result, "save_ssh_key_result is False")