Source
Get Source Providers
API
GET: /provider?environment_id=&project_id=&limit=10&offset=20
Description
This API fetches the list of source providers (New & Existing Connection)
UI

File Path
provider/views.py
Class
class ProviderListCreateUpdateView(viewsets.ModelViewSet):
Code
def list(self, request, *args, **kwargs):
"""_summary_
Args:
request (_type_): _description_
Returns:
_type_: _description_
"""
response = {
"error": None,
"data": None
}
try:
paginator = LimitOffsetPagination()
master_api = None
if request.body:
master_api = json.loads(request.body).get("master_api", "")
if master_api and type(master_api) != bool:
response["error"] = "master_api can only contain True/False as boolean value"
return ResponseUtils.failed_dependency(response)
if master_api:
if not request.user.is_superuser:
response["error"] = "Got master_api flag but you does not have permission to use master api. To use client api remove master_api flag"
return ResponseUtils.failed_dependency(response)
# query_string_keys = list(request.GET.keys())
url = f"{get_secret('master_url')}/provider"
response = requests.request("GET", url)
if response.status_code == 200:
data = json.loads(response.text)
return ResponseUtils.simple_response(data)
else:
return ResponseUtils.bad_request(json.loads(response.text))
else:
provider_type = request.GET.get('provider_type')
if provider_type:
if isinstance(provider_type, list):
kwargs['provider_type__in'] = provider_type
else:
kwargs['provider_type__in'] = (
provider_type[1:-1]).split(',')
provider_category = request.GET.get('provider_category')
if provider_category:
if isinstance(provider_category, list):
kwargs['provider_category__in'] = provider_category
else:
kwargs['provider_category__in'] = (
provider_category[1:-1]).split(',')
provider_plan = request.GET.get('provider_plan')
if provider_plan:
if isinstance(provider_plan, list):
kwargs['provider_plan__in'] = provider_plan
else:
kwargs['provider_plan__in'] = (
provider_plan[1:-1]).split(',')
provider_source = request.GET.get('provider_source')
if provider_source:
if isinstance(provider_source, list):
kwargs['provider_source__in'] = provider_source
else:
kwargs['provider_source__in'] = (
provider_source[1:-1]).split(',')
self.queryset = Provider.objects.filter(
**kwargs).order_by('-created_at')
serialized = self.serializer_class(self.queryset, many=True)
page = paginator.paginate_queryset(serialized.data, request)
return ResponseUtils.simple_response(paginator.get_paginated_response(page).data)
except Exception as err:
response["message"] = str(err)
return ResponseUtils.failed_dependency(response)Overview
Validate connection name
API
POST: /validate_connection_name?environment_id=&project_id=&app_id=
Description
This API validates the uniqueness of the connection name
Params
{
connection_name
tenant_id
}UI
Last updated