diff --git a/openml/_api/__init__.py b/openml/_api/__init__.py new file mode 100644 index 000000000..881f40671 --- /dev/null +++ b/openml/_api/__init__.py @@ -0,0 +1,8 @@ +from openml._api.runtime.core import APIContext + + +def set_api_version(version: str, *, strict: bool = False) -> None: + api_context.set_version(version=version, strict=strict) + + +api_context = APIContext() diff --git a/openml/_api/clients/__init__.py b/openml/_api/clients/__init__.py new file mode 100644 index 000000000..8a5ff94e4 --- /dev/null +++ b/openml/_api/clients/__init__.py @@ -0,0 +1,6 @@ +from .http import HTTPCache, HTTPClient + +__all__ = [ + "HTTPCache", + "HTTPClient", +] diff --git a/openml/_api/clients/http.py b/openml/_api/clients/http.py new file mode 100644 index 000000000..4e126ee92 --- /dev/null +++ b/openml/_api/clients/http.py @@ -0,0 +1,211 @@ +from __future__ import annotations + +import json +import time +from pathlib import Path +from typing import TYPE_CHECKING, Any +from urllib.parse import urlencode, urljoin, urlparse + +import requests +from requests import Response + +from openml.__version__ import __version__ + +if TYPE_CHECKING: + from openml._api.config import DelayMethod + + +class HTTPCache: + def __init__(self, *, path: Path, ttl: int) -> None: + self.path = path + self.ttl = ttl + + def get_key(self, url: str, params: dict[str, Any]) -> str: + parsed_url = urlparse(url) + netloc_parts = parsed_url.netloc.split(".")[::-1] + path_parts = parsed_url.path.strip("/").split("/") + + filtered_params = {k: v for k, v in params.items() if k != "api_key"} + params_part = [urlencode(filtered_params)] if filtered_params else [] + + return str(Path(*netloc_parts, *path_parts, *params_part)) + + def _key_to_path(self, key: str) -> Path: + return self.path.joinpath(key) + + def load(self, key: str) -> Response: + path = self._key_to_path(key) + + if not path.exists(): + raise FileNotFoundError(f"Cache directory not found: {path}") + + meta_path = path / "meta.json" + headers_path = path / "headers.json" + body_path = path / "body.bin" + + if not (meta_path.exists() and headers_path.exists() and body_path.exists()): + raise FileNotFoundError(f"Incomplete cache at {path}") + + with meta_path.open("r", encoding="utf-8") as f: + meta = json.load(f) + + created_at = meta.get("created_at") + if created_at is None: + raise ValueError("Cache metadata missing 'created_at'") + + if time.time() - created_at > self.ttl: + raise TimeoutError(f"Cache expired for {path}") + + with headers_path.open("r", encoding="utf-8") as f: + headers = json.load(f) + + body = body_path.read_bytes() + + response = Response() + response.status_code = meta["status_code"] + response.url = meta["url"] + response.reason = meta["reason"] + response.headers = headers + response._content = body + response.encoding = meta["encoding"] + + return response + + def save(self, key: str, response: Response) -> None: + path = self._key_to_path(key) + path.mkdir(parents=True, exist_ok=True) + + (path / "body.bin").write_bytes(response.content) + + with (path / "headers.json").open("w", encoding="utf-8") as f: + json.dump(dict(response.headers), f) + + meta = { + "status_code": response.status_code, + "url": response.url, + "reason": response.reason, + "encoding": response.encoding, + "elapsed": response.elapsed.total_seconds(), + "created_at": time.time(), + "request": { + "method": response.request.method if response.request else None, + "url": response.request.url if response.request else None, + "headers": dict(response.request.headers) if response.request else None, + "body": response.request.body if response.request else None, + }, + } + + with (path / "meta.json").open("w", encoding="utf-8") as f: + json.dump(meta, f) + + +class HTTPClient: + def __init__( # noqa: PLR0913 + self, + *, + server: str, + base_url: str, + api_key: str, + timeout: int, + retries: int, + delay_method: DelayMethod, + delay_time: int, + cache: HTTPCache | None = None, + ) -> None: + self.server = server + self.base_url = base_url + self.api_key = api_key + self.timeout = timeout + self.retries = retries + self.delay_method = delay_method + self.delay_time = delay_time + self.cache = cache + + self.headers: dict[str, str] = {"user-agent": f"openml-python/{__version__}"} + + def request( + self, + method: str, + path: str, + *, + use_cache: bool = False, + use_api_key: bool = False, + **request_kwargs: Any, + ) -> Response: + url = urljoin(self.server, urljoin(self.base_url, path)) + + # prepare params + params = request_kwargs.pop("params", {}).copy() + if use_api_key: + params["api_key"] = self.api_key + + # prepare headers + headers = request_kwargs.pop("headers", {}).copy() + headers.update(self.headers) + + timeout = request_kwargs.pop("timeout", self.timeout) + + if use_cache and self.cache is not None: + cache_key = self.cache.get_key(url, params) + try: + return self.cache.load(cache_key) + except (FileNotFoundError, TimeoutError): + pass # cache miss or expired, continue + except Exception: + raise # propagate unexpected cache errors + + response = requests.request( + method=method, + url=url, + params=params, + headers=headers, + timeout=timeout, + **request_kwargs, + ) + + if use_cache and self.cache is not None: + self.cache.save(cache_key, response) + + return response + + def get( + self, + path: str, + *, + use_cache: bool = False, + use_api_key: bool = False, + **request_kwargs: Any, + ) -> Response: + return self.request( + method="GET", + path=path, + use_cache=use_cache, + use_api_key=use_api_key, + **request_kwargs, + ) + + def post( + self, + path: str, + **request_kwargs: Any, + ) -> Response: + return self.request( + method="POST", + path=path, + use_cache=False, + use_api_key=True, + **request_kwargs, + ) + + def delete( + self, + path: str, + **request_kwargs: Any, + ) -> Response: + return self.request( + method="DELETE", + path=path, + use_cache=False, + use_api_key=True, + **request_kwargs, + ) diff --git a/openml/_api/clients/minio.py b/openml/_api/clients/minio.py new file mode 100644 index 000000000..e69de29bb diff --git a/openml/_api/config.py b/openml/_api/config.py new file mode 100644 index 000000000..aa153a556 --- /dev/null +++ b/openml/_api/config.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from dataclasses import dataclass +from enum import Enum + + +class DelayMethod(str, Enum): + HUMAN = "human" + ROBOT = "robot" + + +@dataclass +class APIConfig: + server: str + base_url: str + api_key: str + timeout: int = 10 # seconds + + +@dataclass +class APISettings: + v1: APIConfig + v2: APIConfig + + +@dataclass +class ConnectionConfig: + retries: int = 3 + delay_method: DelayMethod = DelayMethod.HUMAN + delay_time: int = 1 # seconds + + +@dataclass +class CacheConfig: + dir: str = "~/.openml/cache" + ttl: int = 60 * 60 * 24 * 7 # one week + + +@dataclass +class Settings: + api: APISettings + connection: ConnectionConfig + cache: CacheConfig + + +settings = Settings( + api=APISettings( + v1=APIConfig( + server="https://www.openml.org/", + base_url="api/v1/xml/", + api_key="...", + ), + v2=APIConfig( + server="http://127.0.0.1:8001/", + base_url="", + api_key="...", + ), + ), + connection=ConnectionConfig(), + cache=CacheConfig(), +) diff --git a/openml/_api/resources/__init__.py b/openml/_api/resources/__init__.py new file mode 100644 index 000000000..b1af3c1a8 --- /dev/null +++ b/openml/_api/resources/__init__.py @@ -0,0 +1,4 @@ +from openml._api.resources.datasets import DatasetsV1, DatasetsV2 +from openml._api.resources.tasks import TasksV1, TasksV2 + +__all__ = ["DatasetsV1", "DatasetsV2", "TasksV1", "TasksV2"] diff --git a/openml/_api/resources/base.py b/openml/_api/resources/base.py new file mode 100644 index 000000000..aea1213fc --- /dev/null +++ b/openml/_api/resources/base.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + import pandas as pd + from _api.http import HTTPClient + from requests import Response + + from openml.datasets.dataset import OpenMLDataset + from openml.tasks.task import OpenMLTask, TaskType + + +class ResourceAPI: + def __init__(self, http: HTTPClient): + self._http = http + + +class DatasetsAPI(ResourceAPI, ABC): + @abstractmethod + def get(self, dataset_id: int) -> OpenMLDataset | tuple[OpenMLDataset, Response]: ... + + +class TasksAPI(ResourceAPI, ABC): + @abstractmethod + def get( + self, + task_id: int, + ) -> OpenMLTask: + """ + API v1: + GET /task/{task_id} + + API v2: + GET /tasks/{task_id} + """ + ... + + # Task listing (V1 only) + @abstractmethod + def list( + self, + limit: int, + offset: int, + task_type: TaskType | int | None = None, + **kwargs: Any, + ) -> pd.DataFrame: + """ + List tasks with filters. + + API v1: + GET /task/list + + API v2: + Not available. + + Returns + ------- + pandas.DataFrame + """ + ... diff --git a/openml/_api/resources/datasets.py b/openml/_api/resources/datasets.py new file mode 100644 index 000000000..9ff1ec278 --- /dev/null +++ b/openml/_api/resources/datasets.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from openml._api.resources.base import DatasetsAPI + +if TYPE_CHECKING: + from responses import Response + + from openml.datasets.dataset import OpenMLDataset + + +class DatasetsV1(DatasetsAPI): + def get(self, dataset_id: int) -> OpenMLDataset | tuple[OpenMLDataset, Response]: + raise NotImplementedError + + +class DatasetsV2(DatasetsAPI): + def get(self, dataset_id: int) -> OpenMLDataset | tuple[OpenMLDataset, Response]: + raise NotImplementedError diff --git a/openml/_api/resources/tasks.py b/openml/_api/resources/tasks.py new file mode 100644 index 000000000..300efedf9 --- /dev/null +++ b/openml/_api/resources/tasks.py @@ -0,0 +1,384 @@ +from __future__ import annotations + +import builtins +import warnings +from typing import Any + +import pandas as pd +import xmltodict + +from openml._api.resources.base import TasksAPI +from openml.tasks.task import ( + OpenMLClassificationTask, + OpenMLClusteringTask, + OpenMLLearningCurveTask, + OpenMLRegressionTask, + OpenMLTask, + TaskType, +) + +TASKS_CACHE_DIR_NAME = "tasks" + + +class TasksV1(TasksAPI): + def get(self, task_id: int) -> OpenMLTask: + """Download OpenML task for a given task ID. + + Downloads the task representation. + + Parameters + ---------- + task_id : int + The OpenML task id of the task to download. + get_dataset_kwargs : + Args and kwargs can be used pass optional parameters to + :meth:`openml.datasets.get_dataset`. + + Returns + ------- + task: OpenMLTask + """ + if not isinstance(task_id, int): + raise TypeError(f"Task id should be integer, is {type(task_id)}") + + response = self._http.get(f"task/{task_id}") + return self._create_task_from_xml(response.text) + + def _create_task_from_xml(self, xml: str) -> OpenMLTask: + """Create a task given a xml string. + + Parameters + ---------- + xml : string + Task xml representation. + + Returns + ------- + OpenMLTask + """ + dic = xmltodict.parse(xml)["oml:task"] + estimation_parameters = {} + inputs = {} + # Due to the unordered structure we obtain, we first have to extract + # the possible keys of oml:input; dic["oml:input"] is a list of + # OrderedDicts + + # Check if there is a list of inputs + if isinstance(dic["oml:input"], list): + for input_ in dic["oml:input"]: + name = input_["@name"] + inputs[name] = input_ + # Single input case + elif isinstance(dic["oml:input"], dict): + name = dic["oml:input"]["@name"] + inputs[name] = dic["oml:input"] + + evaluation_measures = None + if "evaluation_measures" in inputs: + evaluation_measures = inputs["evaluation_measures"]["oml:evaluation_measures"][ + "oml:evaluation_measure" + ] + + task_type = TaskType(int(dic["oml:task_type_id"])) + common_kwargs = { + "task_id": dic["oml:task_id"], + "task_type": dic["oml:task_type"], + "task_type_id": task_type, + "data_set_id": inputs["source_data"]["oml:data_set"]["oml:data_set_id"], + "evaluation_measure": evaluation_measures, + } + # TODO: add OpenMLClusteringTask? + if task_type in ( + TaskType.SUPERVISED_CLASSIFICATION, + TaskType.SUPERVISED_REGRESSION, + TaskType.LEARNING_CURVE, + ): + # Convert some more parameters + for parameter in inputs["estimation_procedure"]["oml:estimation_procedure"][ + "oml:parameter" + ]: + name = parameter["@name"] + text = parameter.get("#text", "") + estimation_parameters[name] = text + + common_kwargs["estimation_procedure_type"] = inputs["estimation_procedure"][ + "oml:estimation_procedure" + ]["oml:type"] + common_kwargs["estimation_procedure_id"] = int( + inputs["estimation_procedure"]["oml:estimation_procedure"]["oml:id"] + ) + + common_kwargs["estimation_parameters"] = estimation_parameters + common_kwargs["target_name"] = inputs["source_data"]["oml:data_set"][ + "oml:target_feature" + ] + common_kwargs["data_splits_url"] = inputs["estimation_procedure"][ + "oml:estimation_procedure" + ]["oml:data_splits_url"] + + cls = { + TaskType.SUPERVISED_CLASSIFICATION: OpenMLClassificationTask, + TaskType.SUPERVISED_REGRESSION: OpenMLRegressionTask, + TaskType.CLUSTERING: OpenMLClusteringTask, + TaskType.LEARNING_CURVE: OpenMLLearningCurveTask, + }.get(task_type) + if cls is None: + raise NotImplementedError(f"Task type {common_kwargs['task_type']} not supported.") + return cls(**common_kwargs) # type: ignore + + def list( + self, + limit: int, + offset: int, + task_type: TaskType | int | None = None, + **kwargs: Any, + ) -> pd.DataFrame: + """ + Perform the api call to return a number of tasks having the given filters. + + Parameters + ---------- + Filter task_type is separated from the other filters because + it is used as task_type in the task description, but it is named + type when used as a filter in list tasks call. + limit: int + offset: int + task_type : TaskType, optional + Refers to the type of task. + kwargs: dict, optional + Legal filter operators: tag, task_id (list), data_tag, status, limit, + offset, data_id, data_name, number_instances, number_features, + number_classes, number_missing_values. + + Returns + ------- + dataframe + """ + api_call = "task/list" + if limit is not None: + api_call += f"/limit/{limit}" + if offset is not None: + api_call += f"/offset/{offset}" + if task_type is not None: + tvalue = task_type.value if isinstance(task_type, TaskType) else task_type + api_call += f"/type/{tvalue}" + if kwargs is not None: + for operator, value in kwargs.items(): + if value is not None: + if operator == "task_id": + value = ",".join([str(int(i)) for i in value]) # noqa: PLW2901 + api_call += f"/{operator}/{value}" + + return self._fetch_tasks_df(api_call=api_call) + + def _fetch_tasks_df(self, api_call: str) -> pd.DataFrame: # noqa: C901, PLR0912 + """Returns a Pandas DataFrame with information about OpenML tasks. + + Parameters + ---------- + api_call : str + The API call specifying which tasks to return. + + Returns + ------- + A Pandas DataFrame with information about OpenML tasks. + + Raises + ------ + ValueError + If the XML returned by the OpenML API does not contain 'oml:tasks', '@xmlns:oml', + or has an incorrect value for '@xmlns:oml'. + KeyError + If an invalid key is found in the XML for a task. + """ + xml_string = self._http.get(api_call).text + + tasks_dict = xmltodict.parse(xml_string, force_list=("oml:task", "oml:input")) + # Minimalistic check if the XML is useful + if "oml:tasks" not in tasks_dict: + raise ValueError(f'Error in return XML, does not contain "oml:runs": {tasks_dict}') + + if "@xmlns:oml" not in tasks_dict["oml:tasks"]: + raise ValueError( + f'Error in return XML, does not contain "oml:runs"/@xmlns:oml: {tasks_dict}' + ) + + if tasks_dict["oml:tasks"]["@xmlns:oml"] != "http://openml.org/openml": + raise ValueError( + "Error in return XML, value of " + '"oml:runs"/@xmlns:oml is not ' + f'"http://openml.org/openml": {tasks_dict!s}', + ) + + assert isinstance(tasks_dict["oml:tasks"]["oml:task"], list), type(tasks_dict["oml:tasks"]) + + tasks = {} + procs = self._get_estimation_procedure_list() + proc_dict = {x["id"]: x for x in procs} + + for task_ in tasks_dict["oml:tasks"]["oml:task"]: + tid = None + try: + tid = int(task_["oml:task_id"]) + task_type_int = int(task_["oml:task_type_id"]) + try: + task_type_id = TaskType(task_type_int) + except ValueError as e: + warnings.warn( + f"Could not create task type id for {task_type_int} due to error {e}", + RuntimeWarning, + stacklevel=2, + ) + continue + + task = { + "tid": tid, + "ttid": task_type_id, + "did": int(task_["oml:did"]), + "name": task_["oml:name"], + "task_type": task_["oml:task_type"], + "status": task_["oml:status"], + } + + # Other task inputs + for _input in task_.get("oml:input", []): + if _input["@name"] == "estimation_procedure": + task[_input["@name"]] = proc_dict[int(_input["#text"])]["name"] + else: + value = _input.get("#text") + task[_input["@name"]] = value + + # The number of qualities can range from 0 to infinity + for quality in task_.get("oml:quality", []): + if "#text" not in quality: + quality_value = 0.0 + else: + quality["#text"] = float(quality["#text"]) + if abs(int(quality["#text"]) - quality["#text"]) < 0.0000001: + quality["#text"] = int(quality["#text"]) + quality_value = quality["#text"] + task[quality["@name"]] = quality_value + tasks[tid] = task + except KeyError as e: + if tid is not None: + warnings.warn( + f"Invalid xml for task {tid}: {e}\nFrom {task_}", + RuntimeWarning, + stacklevel=2, + ) + else: + warnings.warn( + f"Could not find key {e} in {task_}!", RuntimeWarning, stacklevel=2 + ) + + return pd.DataFrame.from_dict(tasks, orient="index") + + def _get_estimation_procedure_list(self) -> builtins.list[dict[str, Any]]: + """Return a list of all estimation procedures which are on OpenML. + + Returns + ------- + procedures : list + A list of all estimation procedures. Every procedure is represented by + a dictionary containing the following information: id, task type id, + name, type, repeats, folds, stratified. + """ + url_suffix = "estimationprocedure/list" + xml_string = self._http.get(url_suffix).text + + procs_dict = xmltodict.parse(xml_string) + # Minimalistic check if the XML is useful + if "oml:estimationprocedures" not in procs_dict: + raise ValueError("Error in return XML, does not contain tag oml:estimationprocedures.") + + if "@xmlns:oml" not in procs_dict["oml:estimationprocedures"]: + raise ValueError( + "Error in return XML, does not contain tag " + "@xmlns:oml as a child of oml:estimationprocedures.", + ) + + if procs_dict["oml:estimationprocedures"]["@xmlns:oml"] != "http://openml.org/openml": + raise ValueError( + "Error in return XML, value of " + "oml:estimationprocedures/@xmlns:oml is not " + "http://openml.org/openml, but {}".format( + str(procs_dict["oml:estimationprocedures"]["@xmlns:oml"]) + ), + ) + + procs: list[dict[str, Any]] = [] + for proc_ in procs_dict["oml:estimationprocedures"]["oml:estimationprocedure"]: + task_type_int = int(proc_["oml:ttid"]) + try: + task_type_id = TaskType(task_type_int) + procs.append( + { + "id": int(proc_["oml:id"]), + "task_type_id": task_type_id, + "name": proc_["oml:name"], + "type": proc_["oml:type"], + }, + ) + except ValueError as e: + warnings.warn( + f"Could not create task type id for {task_type_int} due to error {e}", + RuntimeWarning, + stacklevel=2, + ) + + return procs + + +class TasksV2(TasksAPI): + def get(self, task_id: int) -> OpenMLTask: + response = self._http.get(f"tasks/{task_id}") + return self._create_task_from_json(response.json()) + + def _create_task_from_json(self, task_json: dict) -> OpenMLTask: + task_type_id = TaskType(int(task_json["task_type_id"])) + + inputs = {i["name"]: i for i in task_json.get("input", [])} + + source = inputs["source_data"]["data_set"] + + common_kwargs = { + "task_id": int(task_json["id"]), + "task_type": task_json["task_type"], + "task_type_id": task_type_id, + "data_set_id": int(source["data_set_id"]), + "evaluation_measure": None, + } + + if task_type_id in ( + TaskType.SUPERVISED_CLASSIFICATION, + TaskType.SUPERVISED_REGRESSION, + TaskType.LEARNING_CURVE, + ): + est = inputs.get("estimation_procedure", {}).get("estimation_procedure") + + if est: + common_kwargs["estimation_procedure_id"] = int(est["id"]) + common_kwargs["estimation_procedure_type"] = est["type"] + common_kwargs["estimation_parameters"] = { + p["name"]: p.get("value") for p in est.get("parameter", []) + } + + common_kwargs["target_name"] = source.get("target_feature") + + cls = { + TaskType.SUPERVISED_CLASSIFICATION: OpenMLClassificationTask, + TaskType.SUPERVISED_REGRESSION: OpenMLRegressionTask, + TaskType.CLUSTERING: OpenMLClusteringTask, + TaskType.LEARNING_CURVE: OpenMLLearningCurveTask, + }[task_type_id] + + return cls(**common_kwargs) # type: ignore + + def list( + self, + limit: int, + offset: int, + task_type: TaskType | int | None = None, + **kwargs: Any, + ) -> pd.DataFrame: + raise NotImplementedError("Task listing is not available in API v2 yet.") diff --git a/openml/_api/runtime/__init__.py b/openml/_api/runtime/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/openml/_api/runtime/core.py b/openml/_api/runtime/core.py new file mode 100644 index 000000000..483b74d3d --- /dev/null +++ b/openml/_api/runtime/core.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING + +from openml._api.clients import HTTPCache, HTTPClient +from openml._api.config import settings +from openml._api.resources import ( + DatasetsV1, + DatasetsV2, + TasksV1, + TasksV2, +) + +if TYPE_CHECKING: + from openml._api.resources.base import DatasetsAPI, TasksAPI + + +class APIBackend: + def __init__(self, *, datasets: DatasetsAPI, tasks: TasksAPI): + self.datasets = datasets + self.tasks = tasks + + +def build_backend(version: str, *, strict: bool) -> APIBackend: + http_cache = HTTPCache( + path=Path(settings.cache.dir), + ttl=settings.cache.ttl, + ) + v1_http_client = HTTPClient( + server=settings.api.v1.server, + base_url=settings.api.v1.base_url, + api_key=settings.api.v1.api_key, + timeout=settings.api.v1.timeout, + retries=settings.connection.retries, + delay_method=settings.connection.delay_method, + delay_time=settings.connection.delay_time, + cache=http_cache, + ) + v2_http_client = HTTPClient( + server=settings.api.v2.server, + base_url=settings.api.v2.base_url, + api_key=settings.api.v2.api_key, + timeout=settings.api.v2.timeout, + retries=settings.connection.retries, + delay_method=settings.connection.delay_method, + delay_time=settings.connection.delay_time, + cache=http_cache, + ) + + v1 = APIBackend( + datasets=DatasetsV1(v1_http_client), + tasks=TasksV1(v1_http_client), + ) + + if version == "v1": + return v1 + + v2 = APIBackend( + datasets=DatasetsV2(v2_http_client), + tasks=TasksV2(v2_http_client), + ) + + if strict: + return v2 + + return v1 + + +class APIContext: + def __init__(self) -> None: + self._backend = build_backend("v1", strict=False) + + def set_version(self, version: str, *, strict: bool = False) -> None: + self._backend = build_backend(version=version, strict=strict) + + @property + def backend(self) -> APIBackend: + return self._backend diff --git a/openml/_api/runtime/fallback.py b/openml/_api/runtime/fallback.py new file mode 100644 index 000000000..1bc99d270 --- /dev/null +++ b/openml/_api/runtime/fallback.py @@ -0,0 +1,12 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from openml._api.resources.base import ResourceAPI + + +class FallbackProxy: + def __init__(self, primary: ResourceAPI, fallback: ResourceAPI): + self._primary = primary + self._fallback = fallback diff --git a/openml/tasks/functions.py b/openml/tasks/functions.py index 3df2861c0..7c7973a4d 100644 --- a/openml/tasks/functions.py +++ b/openml/tasks/functions.py @@ -1,19 +1,16 @@ # License: BSD 3-Clause from __future__ import annotations -import os -import re import warnings from functools import partial -from typing import Any +from typing import TYPE_CHECKING, Any import pandas as pd -import xmltodict -import openml._api_calls import openml.utils +from openml._api import api_context +from openml._api.resources.tasks import TasksV1, TasksV2 from openml.datasets import get_dataset -from openml.exceptions import OpenMLCacheException from .task import ( OpenMLClassificationTask, @@ -21,109 +18,13 @@ OpenMLLearningCurveTask, OpenMLRegressionTask, OpenMLSupervisedTask, - OpenMLTask, TaskType, ) -TASKS_CACHE_DIR_NAME = "tasks" - - -def _get_cached_tasks() -> dict[int, OpenMLTask]: - """Return a dict of all the tasks which are cached locally. - - Returns - ------- - tasks : OrderedDict - A dict of all the cached tasks. Each task is an instance of - OpenMLTask. - """ - task_cache_dir = openml.utils._create_cache_directory(TASKS_CACHE_DIR_NAME) - directory_content = os.listdir(task_cache_dir) # noqa: PTH208 - directory_content.sort() - - # Find all dataset ids for which we have downloaded the dataset - # description - tids = (int(did) for did in directory_content if re.match(r"[0-9]*", did)) - return {tid: _get_cached_task(tid) for tid in tids} - - -def _get_cached_task(tid: int) -> OpenMLTask: - """Return a cached task based on the given id. - - Parameters - ---------- - tid : int - Id of the task. - - Returns - ------- - OpenMLTask - """ - tid_cache_dir = openml.utils._create_cache_directory_for_id(TASKS_CACHE_DIR_NAME, tid) - - task_xml_path = tid_cache_dir / "task.xml" - try: - with task_xml_path.open(encoding="utf8") as fh: - return _create_task_from_xml(fh.read()) - except OSError as e: - openml.utils._remove_cache_dir_for_id(TASKS_CACHE_DIR_NAME, tid_cache_dir) - raise OpenMLCacheException(f"Task file for tid {tid} not cached") from e - - -def _get_estimation_procedure_list() -> list[dict[str, Any]]: - """Return a list of all estimation procedures which are on OpenML. - - Returns - ------- - procedures : list - A list of all estimation procedures. Every procedure is represented by - a dictionary containing the following information: id, task type id, - name, type, repeats, folds, stratified. - """ - url_suffix = "estimationprocedure/list" - xml_string = openml._api_calls._perform_api_call(url_suffix, "get") - - procs_dict = xmltodict.parse(xml_string) - # Minimalistic check if the XML is useful - if "oml:estimationprocedures" not in procs_dict: - raise ValueError("Error in return XML, does not contain tag oml:estimationprocedures.") - - if "@xmlns:oml" not in procs_dict["oml:estimationprocedures"]: - raise ValueError( - "Error in return XML, does not contain tag " - "@xmlns:oml as a child of oml:estimationprocedures.", - ) - - if procs_dict["oml:estimationprocedures"]["@xmlns:oml"] != "http://openml.org/openml": - raise ValueError( - "Error in return XML, value of " - "oml:estimationprocedures/@xmlns:oml is not " - "http://openml.org/openml, but {}".format( - str(procs_dict["oml:estimationprocedures"]["@xmlns:oml"]) - ), - ) - - procs: list[dict[str, Any]] = [] - for proc_ in procs_dict["oml:estimationprocedures"]["oml:estimationprocedure"]: - task_type_int = int(proc_["oml:ttid"]) - try: - task_type_id = TaskType(task_type_int) - procs.append( - { - "id": int(proc_["oml:id"]), - "task_type_id": task_type_id, - "name": proc_["oml:name"], - "type": proc_["oml:type"], - }, - ) - except ValueError as e: - warnings.warn( - f"Could not create task type id for {task_type_int} due to error {e}", - RuntimeWarning, - stacklevel=2, - ) - - return procs +if TYPE_CHECKING: + from .task import ( + OpenMLTask, + ) def list_tasks( # noqa: PLR0913 @@ -175,7 +76,7 @@ def list_tasks( # noqa: PLR0913 calculated for the associated dataset, some of these are also returned. """ listing_call = partial( - _list_tasks, + api_context.backend.tasks.list, task_type=task_type, tag=tag, data_tag=data_tag, @@ -194,151 +95,6 @@ def list_tasks( # noqa: PLR0913 return pd.concat(batches) -def _list_tasks( - limit: int, - offset: int, - task_type: TaskType | int | None = None, - **kwargs: Any, -) -> pd.DataFrame: - """ - Perform the api call to return a number of tasks having the given filters. - - Parameters - ---------- - Filter task_type is separated from the other filters because - it is used as task_type in the task description, but it is named - type when used as a filter in list tasks call. - limit: int - offset: int - task_type : TaskType, optional - Refers to the type of task. - kwargs: dict, optional - Legal filter operators: tag, task_id (list), data_tag, status, limit, - offset, data_id, data_name, number_instances, number_features, - number_classes, number_missing_values. - - Returns - ------- - dataframe - """ - api_call = "task/list" - if limit is not None: - api_call += f"/limit/{limit}" - if offset is not None: - api_call += f"/offset/{offset}" - if task_type is not None: - tvalue = task_type.value if isinstance(task_type, TaskType) else task_type - api_call += f"/type/{tvalue}" - if kwargs is not None: - for operator, value in kwargs.items(): - if value is not None: - if operator == "task_id": - value = ",".join([str(int(i)) for i in value]) # noqa: PLW2901 - api_call += f"/{operator}/{value}" - - return __list_tasks(api_call=api_call) - - -def __list_tasks(api_call: str) -> pd.DataFrame: # noqa: C901, PLR0912 - """Returns a Pandas DataFrame with information about OpenML tasks. - - Parameters - ---------- - api_call : str - The API call specifying which tasks to return. - - Returns - ------- - A Pandas DataFrame with information about OpenML tasks. - - Raises - ------ - ValueError - If the XML returned by the OpenML API does not contain 'oml:tasks', '@xmlns:oml', - or has an incorrect value for '@xmlns:oml'. - KeyError - If an invalid key is found in the XML for a task. - """ - xml_string = openml._api_calls._perform_api_call(api_call, "get") - tasks_dict = xmltodict.parse(xml_string, force_list=("oml:task", "oml:input")) - # Minimalistic check if the XML is useful - if "oml:tasks" not in tasks_dict: - raise ValueError(f'Error in return XML, does not contain "oml:runs": {tasks_dict}') - - if "@xmlns:oml" not in tasks_dict["oml:tasks"]: - raise ValueError( - f'Error in return XML, does not contain "oml:runs"/@xmlns:oml: {tasks_dict}' - ) - - if tasks_dict["oml:tasks"]["@xmlns:oml"] != "http://openml.org/openml": - raise ValueError( - "Error in return XML, value of " - '"oml:runs"/@xmlns:oml is not ' - f'"http://openml.org/openml": {tasks_dict!s}', - ) - - assert isinstance(tasks_dict["oml:tasks"]["oml:task"], list), type(tasks_dict["oml:tasks"]) - - tasks = {} - procs = _get_estimation_procedure_list() - proc_dict = {x["id"]: x for x in procs} - - for task_ in tasks_dict["oml:tasks"]["oml:task"]: - tid = None - try: - tid = int(task_["oml:task_id"]) - task_type_int = int(task_["oml:task_type_id"]) - try: - task_type_id = TaskType(task_type_int) - except ValueError as e: - warnings.warn( - f"Could not create task type id for {task_type_int} due to error {e}", - RuntimeWarning, - stacklevel=2, - ) - continue - - task = { - "tid": tid, - "ttid": task_type_id, - "did": int(task_["oml:did"]), - "name": task_["oml:name"], - "task_type": task_["oml:task_type"], - "status": task_["oml:status"], - } - - # Other task inputs - for _input in task_.get("oml:input", []): - if _input["@name"] == "estimation_procedure": - task[_input["@name"]] = proc_dict[int(_input["#text"])]["name"] - else: - value = _input.get("#text") - task[_input["@name"]] = value - - # The number of qualities can range from 0 to infinity - for quality in task_.get("oml:quality", []): - if "#text" not in quality: - quality_value = 0.0 - else: - quality["#text"] = float(quality["#text"]) - if abs(int(quality["#text"]) - quality["#text"]) < 0.0000001: - quality["#text"] = int(quality["#text"]) - quality_value = quality["#text"] - task[quality["@name"]] = quality_value - tasks[tid] = task - except KeyError as e: - if tid is not None: - warnings.warn( - f"Invalid xml for task {tid}: {e}\nFrom {task_}", - RuntimeWarning, - stacklevel=2, - ) - else: - warnings.warn(f"Could not find key {e} in {task_}!", RuntimeWarning, stacklevel=2) - - return pd.DataFrame.from_dict(tasks, orient="index") - - def get_tasks( task_ids: list[int], download_data: bool | None = None, @@ -346,7 +102,7 @@ def get_tasks( ) -> list[OpenMLTask]: """Download tasks. - This function iterates :meth:`openml.tasks.get_task`. + This function iterates :meth:`openml.tasks.get`. Parameters ---------- @@ -385,7 +141,6 @@ def get_tasks( return tasks -@openml.utils.thread_safe_if_oslo_installed def get_task( task_id: int, download_splits: bool = False, # noqa: FBT002 @@ -415,129 +170,27 @@ def get_task( if not isinstance(task_id, int): raise TypeError(f"Task id should be integer, is {type(task_id)}") - cache_key_dir = openml.utils._create_cache_directory_for_id(TASKS_CACHE_DIR_NAME, task_id) - tid_cache_dir = cache_key_dir / str(task_id) - tid_cache_dir_existed = tid_cache_dir.exists() - try: - task = _get_task_description(task_id) - dataset = get_dataset(task.dataset_id, **get_dataset_kwargs) - # List of class labels available in dataset description - # Including class labels as part of task meta data handles - # the case where data download was initially disabled - if isinstance(task, (OpenMLClassificationTask, OpenMLLearningCurveTask)): - task.class_labels = dataset.retrieve_class_labels(task.target_name) - # Clustering tasks do not have class labels - # and do not offer download_split - if download_splits and isinstance(task, OpenMLSupervisedTask): - task.download_split() - except Exception as e: - if not tid_cache_dir_existed: - openml.utils._remove_cache_dir_for_id(TASKS_CACHE_DIR_NAME, tid_cache_dir) - raise e - - return task - - -def _get_task_description(task_id: int) -> OpenMLTask: - try: - return _get_cached_task(task_id) - except OpenMLCacheException: - _cache_dir = openml.utils._create_cache_directory_for_id(TASKS_CACHE_DIR_NAME, task_id) - xml_file = _cache_dir / "task.xml" - task_xml = openml._api_calls._perform_api_call(f"task/{task_id}", "get") - - with xml_file.open("w", encoding="utf8") as fh: - fh.write(task_xml) - return _create_task_from_xml(task_xml) - - -def _create_task_from_xml(xml: str) -> OpenMLTask: - """Create a task given a xml string. + task = api_context.backend.tasks.get(task_id) + dataset = get_dataset(task.dataset_id, **get_dataset_kwargs) - Parameters - ---------- - xml : string - Task xml representation. + if isinstance(task, (OpenMLClassificationTask, OpenMLLearningCurveTask)): + task.class_labels = dataset.retrieve_class_labels(task.target_name) - Returns - ------- - OpenMLTask - """ - dic = xmltodict.parse(xml)["oml:task"] - estimation_parameters = {} - inputs = {} - # Due to the unordered structure we obtain, we first have to extract - # the possible keys of oml:input; dic["oml:input"] is a list of - # OrderedDicts - - # Check if there is a list of inputs - if isinstance(dic["oml:input"], list): - for input_ in dic["oml:input"]: - name = input_["@name"] - inputs[name] = input_ - # Single input case - elif isinstance(dic["oml:input"], dict): - name = dic["oml:input"]["@name"] - inputs[name] = dic["oml:input"] - - evaluation_measures = None - if "evaluation_measures" in inputs: - evaluation_measures = inputs["evaluation_measures"]["oml:evaluation_measures"][ - "oml:evaluation_measure" - ] - - task_type = TaskType(int(dic["oml:task_type_id"])) - common_kwargs = { - "task_id": dic["oml:task_id"], - "task_type": dic["oml:task_type"], - "task_type_id": task_type, - "data_set_id": inputs["source_data"]["oml:data_set"]["oml:data_set_id"], - "evaluation_measure": evaluation_measures, - } - # TODO: add OpenMLClusteringTask? - if task_type in ( - TaskType.SUPERVISED_CLASSIFICATION, - TaskType.SUPERVISED_REGRESSION, - TaskType.LEARNING_CURVE, + if ( + download_splits + and isinstance(task, OpenMLSupervisedTask) + and isinstance(api_context.backend.tasks, TasksV1) ): - # Convert some more parameters - for parameter in inputs["estimation_procedure"]["oml:estimation_procedure"][ - "oml:parameter" - ]: - name = parameter["@name"] - text = parameter.get("#text", "") - estimation_parameters[name] = text - - common_kwargs["estimation_procedure_type"] = inputs["estimation_procedure"][ - "oml:estimation_procedure" - ]["oml:type"] - common_kwargs["estimation_procedure_id"] = int( - inputs["estimation_procedure"]["oml:estimation_procedure"]["oml:id"] + task.download_split() + elif download_splits and isinstance(api_context.backend.tasks, TasksV2): + warnings.warn( + "`download_splits` is not yet supported in the v2 API and will be ignored.", + stacklevel=2, ) - common_kwargs["estimation_parameters"] = estimation_parameters - common_kwargs["target_name"] = inputs["source_data"]["oml:data_set"]["oml:target_feature"] - common_kwargs["data_splits_url"] = inputs["estimation_procedure"][ - "oml:estimation_procedure" - ]["oml:data_splits_url"] - - cls = { - TaskType.SUPERVISED_CLASSIFICATION: OpenMLClassificationTask, - TaskType.SUPERVISED_REGRESSION: OpenMLRegressionTask, - TaskType.CLUSTERING: OpenMLClusteringTask, - TaskType.LEARNING_CURVE: OpenMLLearningCurveTask, - }.get(task_type) - if cls is None: - raise NotImplementedError( - f"Task type '{common_kwargs['task_type']}' is not supported. " - f"Supported task types: SUPERVISED_CLASSIFICATION," - f"SUPERVISED_REGRESSION, CLUSTERING, LEARNING_CURVE." - f"Please check the OpenML documentation for available task types." - ) - return cls(**common_kwargs) # type: ignore + return task -# TODO(eddiebergman): overload on `task_type` def create_task( task_type: TaskType, dataset_id: int, @@ -589,13 +242,7 @@ def create_task( elif task_type == TaskType.SUPERVISED_REGRESSION: task_cls = OpenMLRegressionTask # type: ignore else: - raise NotImplementedError( - f"Task type ID {task_type:d} is not supported. " - f"Supported task type IDs: {TaskType.SUPERVISED_CLASSIFICATION.value}," - f"{TaskType.SUPERVISED_REGRESSION.value}, " - f"{TaskType.CLUSTERING.value}, {TaskType.LEARNING_CURVE.value}. " - f"Please refer to the TaskType enum for valid task type identifiers." - ) + raise NotImplementedError(f"Task type {task_type:d} not supported.") return task_cls( task_type_id=task_type, diff --git a/tests/test_api/__init__.py b/tests/test_api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_api/test_tasks.py b/tests/test_api/test_tasks.py new file mode 100644 index 000000000..03c942032 --- /dev/null +++ b/tests/test_api/test_tasks.py @@ -0,0 +1,203 @@ +import unittest +from unittest.mock import MagicMock, patch, call +import pandas as pd + +from openml._api.resources.tasks import TasksV1, TasksV2 +from openml.tasks import ( + TaskType, + OpenMLClassificationTask, + OpenMLRegressionTask, + list_tasks, + get_task, + get_tasks, + delete_task, + create_task +) + +class TestTasksEndpoints(unittest.TestCase): + + def setUp(self): + # We mock the HTTP client (requests session) used by the API classes + self.mock_http = MagicMock() + + def test_v1_get_endpoint(self): + """Test GET task/{id} endpoint construction and parsing""" + api = TasksV1(self.mock_http) + + # We include two parameters to ensure xmltodict parses 'oml:parameter' + # as a list, preventing the TypeError seen previously. + self.mock_http.get.return_value.text = """ + + 1 + 1 + Supervised Classification + + + 100 + class + + + + + 1 + crossvalidation + http://splits + 10 + true + + + + """ + + task = api.get(1) + + self.mock_http.get.assert_called_with("task/1") + self.assertIsInstance(task, OpenMLClassificationTask) + self.assertEqual(task.task_id, 1) + + def test_v1_list_endpoint_url_construction(self): + """Test list tasks endpoint URL generation with filters""" + api = TasksV1(self.mock_http) + + # We mock `_fetch_tasks_df` because parsing the list XML is complex + # and we just want to verify the URL parameters here. + with patch.object(api, '_fetch_tasks_df') as mock_fetch: + api.list( + limit=100, + offset=50, + task_type=TaskType.SUPERVISED_CLASSIFICATION, + tag="study_14" + ) + + # Verify the constructed API call string passed to the fetcher + expected_call = "task/list/limit/100/offset/50/type/1/tag/study_14" + mock_fetch.assert_called_with(api_call=expected_call) + + + def test_v2_get_endpoint(self): + """Test GET tasks/{id} V2 endpoint""" + api = TasksV2(self.mock_http) + + # JSON response structure matches what V2 expects + self.mock_http.get.return_value.json.return_value = { + "id": 500, + "task_type_id": "2", # Regression + "task_type": "Supervised Regression", + "input": [ + { + "name": "source_data", + "data_set": {"data_set_id": "99", "target_feature": "price"} + }, + { + "name": "estimation_procedure", + "estimation_procedure": { + "id": "5", + "type": "cv", + "parameter": [] + } + } + ] + } + + task = api.get(500) + + self.mock_http.get.assert_called_with("tasks/500") + self.assertIsInstance(task, OpenMLRegressionTask) + self.assertEqual(task.target_name, "price") + + def test_v2_list_not_available(self): + """Ensure V2 list endpoint raises error (as per code)""" + api = TasksV2(self.mock_http) + with self.assertRaises(NotImplementedError): + api.list(limit=10, offset=0) + + +class TestTaskHighLevelFunctions(unittest.TestCase): + """Test the user-facing functions in functions.py""" + + @patch("openml.tasks.functions.api_context") + def test_list_tasks_wrapper(self, mock_api_context): + """Test list_tasks() calls the backend correctly""" + # Setup backend to return a dummy dataframe + mock_api_context.backend.tasks.list.return_value = pd.DataFrame({'id': [1]}) + + list_tasks( + task_type=TaskType.SUPERVISED_CLASSIFICATION, + offset=10, + size=50, + tag="my_tag" + ) + + # The backend list method is called with positional arguments for limit (size) + # and offset because of how `_list_all` works internally. + mock_api_context.backend.tasks.list.assert_called_with( + 50, # limit (size) + 10, # offset + task_type=TaskType.SUPERVISED_CLASSIFICATION, + tag="my_tag", + data_tag=None, + status=None, + data_id=None, + data_name=None, + number_instances=None, + number_features=None, + number_classes=None, + number_missing_values=None + ) + + @patch("openml.tasks.functions.get_dataset") + @patch("openml.tasks.functions.api_context") + def test_get_task_wrapper(self, mock_api_context, mock_get_dataset): + """Test get_task() retrieves task and dataset""" + # Mock Task + mock_task_obj = MagicMock() + mock_task_obj.dataset_id = 123 + mock_task_obj.target_name = "class" + mock_api_context.backend.tasks.get.return_value = mock_task_obj + + # Mock Dataset (needed for class labels) + mock_dataset = MagicMock() + mock_get_dataset.return_value = mock_dataset + + get_task(task_id=10, download_data=False) + + # Verify calls + mock_api_context.backend.tasks.get.assert_called_with(10) + + # `get_task` passes kwargs directly to get_dataset. + mock_get_dataset.assert_called_with(123, download_data=False) + + @patch("openml.tasks.functions.get_task") + def test_get_tasks_list_wrapper(self, mock_get_task): + """Test get_tasks() iterates and calls get_task() for each ID""" + ids_to_fetch = [100, 101] + + # Execute the bulk fetch + get_tasks(ids_to_fetch, download_data=False, download_qualities=False) + + # Verify `get_task` was called exactly twice + self.assertEqual(mock_get_task.call_count, 2) + + # Verify the arguments for each call + expected_calls = [ + call(100, download_data=False, download_qualities=False), + call(101, download_data=False, download_qualities=False) + ] + mock_get_task.assert_has_calls(expected_calls) + + @patch("openml.utils._delete_entity") + def test_delete_task_wrapper(self, mock_delete): + """Test delete_task() hits the delete endpoint""" + delete_task(999) + mock_delete.assert_called_with("task", 999) + + def test_create_task_factory(self): + """Test create_task() returns correct object (no API call until publish)""" + task = create_task( + task_type=TaskType.SUPERVISED_CLASSIFICATION, + dataset_id=1, + estimation_procedure_id=1, + target_name="class" + ) + self.assertIsInstance(task, OpenMLClassificationTask) + self.assertEqual(task.dataset_id, 1) \ No newline at end of file