API Reference

Methods:

Name Description
from_env

Creates an instance of the InfluxDBClient3 class using environment

write

Write data to InfluxDB.

write_file

Write data from a file to InfluxDB.

write_dataframe

Write a DataFrame to InfluxDB.

query

Query data from InfluxDB.

query_dataframe

Query data from InfluxDB and return as a DataFrame.

query_async

Query data from InfluxDB asynchronously.

get_server_version

Get the version of the connected InfluxDB server.

flush

Flush any buffered writes to InfluxDB without closing the client.

close

Close the client and clean up resources.

Source code in influxdb_client_3/__init__.py
class InfluxDBClient3:
    def __init__(
            self,
            host=None,
            org=None,
            database=None,
            token=None,
            write_client_options=None,
            flight_client_options=None,
            write_port_overwrite=None,
            query_port_overwrite=None,
            disable_grpc_compression=False,
            **kwargs):
        """
        Initialize an InfluxDB client.

        :param host: The hostname or IP address of the InfluxDB server.
        :type host: str
        :param org: The InfluxDB organization name for operations.
        :type org: str
        :param database: The database for InfluxDB operations.
        :type database: str
        :param token: The authentication token for accessing the InfluxDB server.
        :type token: str
        :param write_client_options: dictionary for providing additional arguments for the WriteApi client.
        :type write_client_options: dict[str, any]
        :param flight_client_options: dictionary for providing additional arguments for the FlightClient.
        :type flight_client_options: dict[str, any]
        :param disable_grpc_compression: Disable gRPC compression for Flight query responses. Default is False.
        :type disable_grpc_compression: bool
        :key auth_scheme: token authentication scheme. Set to "Bearer" for Edge.
        :key bool verify_ssl: Set this to false to skip verifying SSL certificate when calling API from https server.
        :key str ssl_ca_cert: Set this to customize the certificate file to verify the peer.
        :key str cert_file: Path to the certificate that will be used for mTLS authentication.
        :key str cert_key_file: Path to the file contains private key for mTLS certificate.
        :key str cert_key_password: String or function which returns password for decrypting the mTLS private key.
        :key ssl.SSLContext ssl_context: Specify a custom Python SSL Context for the TLS/ mTLS handshake.
                                         Be aware that only delivered certificate/ key files or an SSL Context are
                                         possible.
        :key str proxy: Set this to configure the http proxy to be used (ex. http://localhost:3128)
        :key str proxy_headers: A dictionary containing headers that will be sent to the proxy. Could be used for proxy
                                authentication. (Applies to Write API only)
        :key int connection_pool_maxsize: Number of connections to save that can be reused by urllib3.
                                          Defaults to "multiprocessing.cpu_count() * 5".
        :key urllib3.util.retry.Retry retries: Set the default retry strategy that is used for all HTTP requests
                                               except batching writes. As a default there is no one retry strategy.
        :key bool auth_basic: Set this to true to enable basic authentication when talking to a InfluxDB 1.8.x that
                              does not use auth-enabled but is protected by a reverse proxy with basic authentication.
                              (defaults to false, don't set to true when talking to InfluxDB 2)
        :key str username: ``username`` to authenticate via username and password credentials to the InfluxDB 2.x
        :key str password: ``password`` to authenticate via username and password credentials to the InfluxDB 2.x
        :key str query_timeout: int value used to set the client query API timeout in milliseconds.
        :key str write_timeout: int value used to set the client write API timeout in milliseconds.
        :key list[str] profilers: list of enabled Flux profilers
        """
        self._org = org if org is not None else "default"
        self._database = database
        self._token = token
        kw_keys = kwargs.keys()

        write_type = DefaultWriteOptions.write_type.value
        write_precision = DefaultWriteOptions.write_precision.value
        write_no_sync = DefaultWriteOptions.no_sync.value
        write_timeout = DefaultWriteOptions.timeout.value

        if isinstance(write_client_options, dict) and write_client_options.get('write_options') is not None:
            write_opts = write_client_options['write_options']
            write_type = getattr(write_opts, 'write_type', write_type)
            write_precision = getattr(write_opts, 'write_precision', write_precision)
            write_no_sync = getattr(write_opts, 'no_sync', write_no_sync)
            write_timeout = getattr(write_opts, 'timeout', write_timeout)

        if kw_keys.__contains__('write_timeout'):
            write_timeout = kwargs.get('write_timeout')

        write_options = WriteOptions(
            write_type=write_type,
            write_precision=write_precision,
            no_sync=write_no_sync,
        )

        self._write_client_options = {
            "write_options": write_options,
            **(write_client_options or {})
        }

        # Parse the host input
        parsed_url = urllib.parse.urlparse(host)

        # Determine the protocol (scheme), hostname, and port
        scheme = parsed_url.scheme if parsed_url.scheme else "https"
        hostname = parsed_url.hostname if parsed_url.hostname else host
        port = parsed_url.port if parsed_url.port else 443

        # Construct the clients using the parsed values
        if write_port_overwrite is not None:
            port = write_port_overwrite

        self._client = _InfluxDBClient(
            url=f"{scheme}://{hostname}:{port}",
            token=self._token,
            org=self._org,
            timeout=write_timeout,
            **kwargs)

        self._write_api = _WriteApi(influxdb_client=self._client, **self._write_client_options)

        if query_port_overwrite is not None:
            port = query_port_overwrite
        if scheme == 'https':
            connection_string = f"grpc+tls://{hostname}:{port}"
        else:
            connection_string = f"grpc+tcp://{hostname}:{port}"

        q_opts_builder = QueryApiOptionsBuilder()
        if disable_grpc_compression:
            q_opts_builder.disable_grpc_compression(True)
        if kw_keys.__contains__('ssl_ca_cert'):
            q_opts_builder.root_certs(kwargs.get('ssl_ca_cert', None))
        if kw_keys.__contains__('verify_ssl'):
            q_opts_builder.tls_verify(kwargs.get('verify_ssl', True))
        if kw_keys.__contains__('proxy'):
            q_opts_builder.proxy(kwargs.get('proxy', None))
        if kw_keys.__contains__('query_timeout'):
            query_timeout_float = float(kwargs.get('query_timeout'))
            q_opts_builder.timeout(query_timeout_float / 1000.0)
        self._query_api = _QueryApi(connection_string=connection_string, token=token,
                                    flight_client_options=flight_client_options,
                                    proxy=kwargs.get("proxy", None), options=q_opts_builder.build())

    @classmethod
    def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3':
        """
        Creates an instance of the ``InfluxDBClient3`` class using environment
        variables for configuration. This method simplifies client creation by
        automatically reading required information from the system environment.

        It verifies the presence of required environment variables such as host,
        token, and database. If any of these variables are missing or empty,
        a ``ValueError`` will be raised. Optional parameters such as precision and
        authentication scheme will also be extracted from the environment when
        present, allowing further customization of the client.

        :param kwargs: Additional parameters that are passed to the client constructor.
        :type kwargs: Any
        :raises ValueError: If any required environment variables are missing or empty.
        :return: An initialized client object of type ``InfluxDBClient3``.
        :rtype: InfluxDBClient3
        """
        required_vars = {
            INFLUX_HOST: os.getenv(INFLUX_HOST),
            INFLUX_TOKEN: os.getenv(INFLUX_TOKEN),
            INFLUX_DATABASE: os.getenv(INFLUX_DATABASE)
        }
        missing_vars = [var for var, value in required_vars.items() if value is None or value == ""]
        if missing_vars:
            raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")

        write_options = WriteOptions(write_type=WriteType.synchronous)

        gzip_threshold = os.getenv(INFLUX_GZIP_THRESHOLD)
        if gzip_threshold is not None:
            kwargs['gzip_threshold'] = _parse_gzip_threshold(gzip_threshold)
            kwargs['enable_gzip'] = True

        write_no_sync = os.getenv(INFLUX_WRITE_NO_SYNC)
        if write_no_sync is not None:
            write_options.no_sync = _parse_write_no_sync(write_no_sync)

        precision = os.getenv(INFLUX_PRECISION)
        if precision is not None:
            write_options.write_precision = _parse_precision(precision)

        write_timeout = os.getenv(INFLUX_WRITE_TIMEOUT)
        if write_timeout is not None:
            # N.B. write_options value has precedent over kwargs['write_timeout'] above
            write_options.timeout = _parse_timeout(write_timeout)

        query_timeout = os.getenv(INFLUX_QUERY_TIMEOUT)
        if query_timeout is not None:
            kwargs['query_timeout'] = _parse_timeout(query_timeout)

        write_client_option = {'write_options': write_options}

        if os.getenv(INFLUX_AUTH_SCHEME) is not None:
            kwargs['auth_scheme'] = os.getenv(INFLUX_AUTH_SCHEME)

        disable_grpc_compression = os.getenv(INFLUX_DISABLE_GRPC_COMPRESSION)
        if disable_grpc_compression is not None:
            disable_grpc_compression = disable_grpc_compression.strip().lower() in ['true', '1', 't', 'y', 'yes']
        else:
            disable_grpc_compression = False

        org = os.getenv(INFLUX_ORG, "default")
        return InfluxDBClient3(
            host=required_vars[INFLUX_HOST],
            token=required_vars[INFLUX_TOKEN],
            database=required_vars[INFLUX_DATABASE],
            write_client_options=write_client_option,
            org=org,
            disable_grpc_compression=disable_grpc_compression,
            **kwargs
        )

    def write(self, record=None, database=None, **kwargs):
        """
        Write data to InfluxDB.

        :param record: The data point(s) to write.
        :type record: object or list of objects
        :param database: The database to write to. If not provided, uses the database provided during initialization.
        :type database: str
        :param kwargs: Additional arguments to pass to the write API.
        """
        if database is None:
            database = self._database

        try:
            return self._write_api.write(bucket=database, record=record, **kwargs)
        except InfluxDBError as e:
            raise e

    def write_dataframe(
        self,
        df: "pd.DataFrame | pl.DataFrame",
        measurement: str,
        timestamp_column: str,
        tags: Optional[List[str]] = None,
        timestamp_timezone: Optional[str] = None,
        database: Optional[str] = None,
        **kwargs
    ):
        """
        Write a DataFrame to InfluxDB.

        This method supports both pandas and polars DataFrames, automatically detecting
        the DataFrame type and using the appropriate serializer.

        :param df: The DataFrame to write. Can be a pandas or polars DataFrame.
        :type df: pandas.DataFrame or polars.DataFrame
        :param measurement: The name of the measurement to write to.
        :type measurement: str
        :param timestamp_column: The name of the column containing timestamps.
                                 This parameter is required for consistency between pandas and polars.
        :type timestamp_column: str
        :param tags: List of column names to use as tags. Remaining columns will be fields.
        :type tags: list[str], optional
        :param timestamp_timezone: Timezone for the timestamp column (e.g., 'UTC', 'America/New_York').
        :type timestamp_timezone: str, optional
        :param database: The database to write to. If not provided, uses the database from initialization.
        :type database: str, optional
        :param kwargs: Additional arguments to pass to the write API.
        :raises TypeError: If df is not a pandas or polars DataFrame.
        :raises InfluxDBError: If there is an error writing to the database.

        Example:
            >>> import pandas as pd
            >>> df = pd.DataFrame({
            ...     'time': pd.to_datetime(['2024-01-01', '2024-01-02']),
            ...     'city': ['London', 'Paris'],
            ...     'temperature': [15.0, 18.0]
            ... })
            >>> client.write_dataframe(
            ...     df,
            ...     measurement='weather',
            ...     timestamp_column='time',
            ...     tags=['city']
            ... )
        """
        if database is None:
            database = self._database

        # Detect DataFrame type
        df_type = str(type(df))
        if 'pandas' not in df_type and 'polars' not in df_type:
            raise TypeError(
                f"Expected a pandas or polars DataFrame, but got {type(df).__name__}. "
                "Please pass a valid DataFrame object."
            )

        try:
            return self._write_api.write(
                bucket=database,
                record=df,
                data_frame_measurement_name=measurement,
                data_frame_tag_columns=tags or [],
                data_frame_timestamp_column=timestamp_column,
                data_frame_timestamp_timezone=timestamp_timezone,
                **kwargs
            )
        except InfluxDBError as e:
            raise e

    def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_column='time', database=None,
                   file_parser_options=None, **kwargs):
        """
        Write data from a file to InfluxDB.

        :param file: The file to write.
        :type file: str
        :param measurement_name: The name of the measurement.
        :type measurement_name: str
        :param tag_columns: Tag columns.
        :type tag_columns: list
        :param timestamp_column: Timestamp column name. Defaults to 'time'.
        :type timestamp_column: str
        :param database: The database to write to. If not provided, uses the database provided during initialization.
        :type database: str
        :param file_parser_options: Function for providing additional arguments for the file parser.
        :type file_parser_options: callable
        :param kwargs: Additional arguments to pass to the write API.
        """
        if database is None:
            database = self._database

        try:
            table = UploadFile(file, file_parser_options).load_file()
            df = table.to_pandas() if isinstance(table, pa.Table) else table
            self._process_dataframe(df, measurement_name, tag_columns or [], timestamp_column, database=database,
                                    **kwargs)
        except Exception as e:
            raise e

    def _process_dataframe(self, df, measurement_name, tag_columns, timestamp_column, database, **kwargs):
        # This function is factored out for clarity.
        # It processes a DataFrame before writing to InfluxDB.

        measurement_column = None
        if measurement_name is None:
            measurement_column = next((col for col in ['measurement', 'iox::measurement'] if col in df.columns), None)
            if measurement_column:
                for measurement in df[measurement_column].unique():
                    df_measurement = df[df[measurement_column] == measurement].drop(columns=[measurement_column])
                    self._write_api.write(bucket=self._database, record=df_measurement,
                                          data_frame_measurement_name=measurement,
                                          data_frame_tag_columns=tag_columns,
                                          data_frame_timestamp_column=timestamp_column)
            else:
                print("'measurement' column not found in the dataframe.")
        else:
            df = df.drop(columns=['measurement'], errors='ignore')
            self._write_api.write(bucket=database, record=df,
                                  data_frame_measurement_name=measurement_name,
                                  data_frame_tag_columns=tag_columns,
                                  data_frame_timestamp_column=timestamp_column, **kwargs)

    def query(self, query: str, language: str = "sql", mode: str = "all", database: str = None, **kwargs):
        """Query data from InfluxDB.

        If you want to use query parameters, you can pass them as kwargs:

        >>> client.query("select * from cpu where host=$host", query_parameters={"host": "server01"})

        :param query: The query to execute on the database.
        :param language: The query language to use. It should be one of "influxql" or "sql". Defaults to "sql".
        :param mode: The mode to use for the query. It should be one of "all", "pandas", "polars", "chunk",
                     "reader" or "schema". Defaults to "all".
        :param database: The database to query from. If not provided, uses the database provided during initialization.
        :param kwargs: Additional arguments to pass to the ``FlightCallOptions headers``. For example, it can be used to
                       set up per request headers.
        :keyword query_parameters: The query parameters to use in the query.
                                   It should be a ``dictionary`` of key-value pairs.
        :return: The query result in the specified mode.
        """
        if mode == "polars" and polars is False:
            raise ImportError("Polars is not installed. Please install it with `pip install polars`.")

        if database is None:
            database = self._database

        try:
            return self._query_api.query(query=query, language=language, mode=mode, database=database, **kwargs)
        except ArrowException as e:
            raise InfluxDB3ClientQueryError(f"Error while executing query: {e}")

    def query_dataframe(
        self,
        query: str,
        language: str = "sql",
        database: Optional[str] = None,
        frame_type: Literal["pandas", "polars"] = "pandas",
        **kwargs
    ) -> "pd.DataFrame | pl.DataFrame":
        """
        Query data from InfluxDB and return as a DataFrame.

        This is a convenience method that wraps query() and returns the result
        directly as a pandas or polars DataFrame.

        :param query: The query to execute on the database.
        :type query: str
        :param language: The query language to use. Should be "sql" or "influxql". Defaults to "sql".
        :type language: str
        :param database: The database to query from. If not provided, uses the database from initialization.
        :type database: str, optional
        :param frame_type: The type of DataFrame to return. Either "pandas" or "polars". Defaults to "pandas".
        :type frame_type: Literal["pandas", "polars"]
        :param kwargs: Additional arguments to pass to the query API.
        :keyword query_parameters: Query parameters as a dictionary of key-value pairs.
        :return: Query result as a pandas or polars DataFrame.
        :rtype: pandas.DataFrame or polars.DataFrame
        :raises ImportError: If polars is requested but not installed.

        Example:
            >>> # Query and get a pandas DataFrame
            >>> df = client.query_dataframe("SELECT * FROM weather WHERE city = 'London'")
            >>>
            >>> # Query and get a polars DataFrame
            >>> df = client.query_dataframe(
            ...     "SELECT * FROM weather",
            ...     frame_type="polars"
            ... )
        """
        if frame_type == "polars" and polars is False:
            raise ImportError(
                "Polars is not installed. Please install it with `pip install polars`."
            )

        return self.query(query=query, language=language, mode=frame_type, database=database, **kwargs)

    async def query_async(self, query: str, language: str = "sql", mode: str = "all", database: str = None, **kwargs):
        """Query data from InfluxDB asynchronously.

        If you want to use query parameters, you can pass them as kwargs:

        >>> await client.query_async("select * from cpu where host=$host", query_parameters={"host": "server01"})

        :param query: The query to execute on the database.
        :param language: The query language to use. It should be one of "influxql" or "sql". Defaults to "sql".
        :param mode: The mode to use for the query. It should be one of "all", "pandas", "polars", "chunk",
                     "reader" or "schema". Defaults to "all".
        :param database: The database to query from. If not provided, uses the database provided during initialization.
        :param kwargs: Additional arguments to pass to the ``FlightCallOptions headers``. For example, it can be used to
                       set up per request headers.
        :keyword query_parameters: The query parameters to use in the query.
                                   It should be a ``dictionary`` of key-value pairs.
        :return: The query result in the specified mode.
        """
        if mode == "polars" and polars is False:
            raise ImportError("Polars is not installed. Please install it with `pip install polars`.")

        if database is None:
            database = self._database

        try:
            return await self._query_api.query_async(query=query,
                                                     language=language,
                                                     mode=mode,
                                                     database=database,
                                                     **kwargs)
        except ArrowException as e:
            raise InfluxDB3ClientQueryError(f"Error while executing query: {e}")

    def get_server_version(self) -> str:
        """
        Get the version of the connected InfluxDB server.

        This method makes a ping request to the server and extracts the version information
        from either the response headers or response body.

        :return: The version string of the InfluxDB server.
        :rtype: str
        """
        version = None
        (resp_body, _, header) = self._client.api_client.call_api(
            resource_path="/ping",
            method="GET",
            response_type=object
        )

        for key, value in header.items():
            if key.lower() == "x-influxdb-version":
                version = value
                break

        if version is None and isinstance(resp_body, dict):
            version = resp_body['version']

        return version

    def flush(self):
        """
        Flush any buffered writes to InfluxDB without closing the client.

        This method immediately sends all buffered data points to the server
        when using batching write mode. After flushing, the client remains
        open and ready for more writes.

        For synchronous write mode, this is a no-op since data is written
        immediately.
        """
        self._write_api.flush()

    def close(self):
        """Close the client and clean up resources."""
        self._write_api.close()
        self._query_api.close()
        self._client.close()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

from_env(**kwargs) classmethod

Creates an instance of the InfluxDBClient3 class using environment variables for configuration. This method simplifies client creation by automatically reading required information from the system environment.

It verifies the presence of required environment variables such as host, token, and database. If any of these variables are missing or empty, a ValueError will be raised. Optional parameters such as precision and authentication scheme will also be extracted from the environment when present, allowing further customization of the client.

:param kwargs: Additional parameters that are passed to the client constructor. :type kwargs: Any :raises ValueError: If any required environment variables are missing or empty. :return: An initialized client object of type InfluxDBClient3. :rtype: InfluxDBClient3

Source code in influxdb_client_3/__init__.py
@classmethod
def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3':
    """
    Creates an instance of the ``InfluxDBClient3`` class using environment
    variables for configuration. This method simplifies client creation by
    automatically reading required information from the system environment.

    It verifies the presence of required environment variables such as host,
    token, and database. If any of these variables are missing or empty,
    a ``ValueError`` will be raised. Optional parameters such as precision and
    authentication scheme will also be extracted from the environment when
    present, allowing further customization of the client.

    :param kwargs: Additional parameters that are passed to the client constructor.
    :type kwargs: Any
    :raises ValueError: If any required environment variables are missing or empty.
    :return: An initialized client object of type ``InfluxDBClient3``.
    :rtype: InfluxDBClient3
    """
    required_vars = {
        INFLUX_HOST: os.getenv(INFLUX_HOST),
        INFLUX_TOKEN: os.getenv(INFLUX_TOKEN),
        INFLUX_DATABASE: os.getenv(INFLUX_DATABASE)
    }
    missing_vars = [var for var, value in required_vars.items() if value is None or value == ""]
    if missing_vars:
        raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")

    write_options = WriteOptions(write_type=WriteType.synchronous)

    gzip_threshold = os.getenv(INFLUX_GZIP_THRESHOLD)
    if gzip_threshold is not None:
        kwargs['gzip_threshold'] = _parse_gzip_threshold(gzip_threshold)
        kwargs['enable_gzip'] = True

    write_no_sync = os.getenv(INFLUX_WRITE_NO_SYNC)
    if write_no_sync is not None:
        write_options.no_sync = _parse_write_no_sync(write_no_sync)

    precision = os.getenv(INFLUX_PRECISION)
    if precision is not None:
        write_options.write_precision = _parse_precision(precision)

    write_timeout = os.getenv(INFLUX_WRITE_TIMEOUT)
    if write_timeout is not None:
        # N.B. write_options value has precedent over kwargs['write_timeout'] above
        write_options.timeout = _parse_timeout(write_timeout)

    query_timeout = os.getenv(INFLUX_QUERY_TIMEOUT)
    if query_timeout is not None:
        kwargs['query_timeout'] = _parse_timeout(query_timeout)

    write_client_option = {'write_options': write_options}

    if os.getenv(INFLUX_AUTH_SCHEME) is not None:
        kwargs['auth_scheme'] = os.getenv(INFLUX_AUTH_SCHEME)

    disable_grpc_compression = os.getenv(INFLUX_DISABLE_GRPC_COMPRESSION)
    if disable_grpc_compression is not None:
        disable_grpc_compression = disable_grpc_compression.strip().lower() in ['true', '1', 't', 'y', 'yes']
    else:
        disable_grpc_compression = False

    org = os.getenv(INFLUX_ORG, "default")
    return InfluxDBClient3(
        host=required_vars[INFLUX_HOST],
        token=required_vars[INFLUX_TOKEN],
        database=required_vars[INFLUX_DATABASE],
        write_client_options=write_client_option,
        org=org,
        disable_grpc_compression=disable_grpc_compression,
        **kwargs
    )

write(record=None, database=None, **kwargs)

Write data to InfluxDB.

:param record: The data point(s) to write. :type record: object or list of objects :param database: The database to write to. If not provided, uses the database provided during initialization. :type database: str :param kwargs: Additional arguments to pass to the write API.

Source code in influxdb_client_3/__init__.py
def write(self, record=None, database=None, **kwargs):
    """
    Write data to InfluxDB.

    :param record: The data point(s) to write.
    :type record: object or list of objects
    :param database: The database to write to. If not provided, uses the database provided during initialization.
    :type database: str
    :param kwargs: Additional arguments to pass to the write API.
    """
    if database is None:
        database = self._database

    try:
        return self._write_api.write(bucket=database, record=record, **kwargs)
    except InfluxDBError as e:
        raise e

write_file(file, measurement_name=None, tag_columns=None, timestamp_column='time', database=None, file_parser_options=None, **kwargs)

Write data from a file to InfluxDB.

:param file: The file to write. :type file: str :param measurement_name: The name of the measurement. :type measurement_name: str :param tag_columns: Tag columns. :type tag_columns: list :param timestamp_column: Timestamp column name. Defaults to 'time'. :type timestamp_column: str :param database: The database to write to. If not provided, uses the database provided during initialization. :type database: str :param file_parser_options: Function for providing additional arguments for the file parser. :type file_parser_options: callable :param kwargs: Additional arguments to pass to the write API.

Source code in influxdb_client_3/__init__.py
def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_column='time', database=None,
               file_parser_options=None, **kwargs):
    """
    Write data from a file to InfluxDB.

    :param file: The file to write.
    :type file: str
    :param measurement_name: The name of the measurement.
    :type measurement_name: str
    :param tag_columns: Tag columns.
    :type tag_columns: list
    :param timestamp_column: Timestamp column name. Defaults to 'time'.
    :type timestamp_column: str
    :param database: The database to write to. If not provided, uses the database provided during initialization.
    :type database: str
    :param file_parser_options: Function for providing additional arguments for the file parser.
    :type file_parser_options: callable
    :param kwargs: Additional arguments to pass to the write API.
    """
    if database is None:
        database = self._database

    try:
        table = UploadFile(file, file_parser_options).load_file()
        df = table.to_pandas() if isinstance(table, pa.Table) else table
        self._process_dataframe(df, measurement_name, tag_columns or [], timestamp_column, database=database,
                                **kwargs)
    except Exception as e:
        raise e

write_dataframe(df, measurement, timestamp_column, tags=None, timestamp_timezone=None, database=None, **kwargs)

Write a DataFrame to InfluxDB.

This method supports both pandas and polars DataFrames, automatically detecting the DataFrame type and using the appropriate serializer.

:param df: The DataFrame to write. Can be a pandas or polars DataFrame. :type df: pandas.DataFrame or polars.DataFrame :param measurement: The name of the measurement to write to. :type measurement: str :param timestamp_column: The name of the column containing timestamps. This parameter is required for consistency between pandas and polars. :type timestamp_column: str :param tags: List of column names to use as tags. Remaining columns will be fields. :type tags: list[str], optional :param timestamp_timezone: Timezone for the timestamp column (e.g., 'UTC', 'America/New_York'). :type timestamp_timezone: str, optional :param database: The database to write to. If not provided, uses the database from initialization. :type database: str, optional :param kwargs: Additional arguments to pass to the write API. :raises TypeError: If df is not a pandas or polars DataFrame. :raises InfluxDBError: If there is an error writing to the database.

Example

import pandas as pd df = pd.DataFrame({ ... 'time': pd.to_datetime(['2024-01-01', '2024-01-02']), ... 'city': ['London', 'Paris'], ... 'temperature': [15.0, 18.0] ... }) client.write_dataframe( ... df, ... measurement='weather', ... timestamp_column='time', ... tags=['city'] ... )

Source code in influxdb_client_3/__init__.py
def write_dataframe(
    self,
    df: "pd.DataFrame | pl.DataFrame",
    measurement: str,
    timestamp_column: str,
    tags: Optional[List[str]] = None,
    timestamp_timezone: Optional[str] = None,
    database: Optional[str] = None,
    **kwargs
):
    """
    Write a DataFrame to InfluxDB.

    This method supports both pandas and polars DataFrames, automatically detecting
    the DataFrame type and using the appropriate serializer.

    :param df: The DataFrame to write. Can be a pandas or polars DataFrame.
    :type df: pandas.DataFrame or polars.DataFrame
    :param measurement: The name of the measurement to write to.
    :type measurement: str
    :param timestamp_column: The name of the column containing timestamps.
                             This parameter is required for consistency between pandas and polars.
    :type timestamp_column: str
    :param tags: List of column names to use as tags. Remaining columns will be fields.
    :type tags: list[str], optional
    :param timestamp_timezone: Timezone for the timestamp column (e.g., 'UTC', 'America/New_York').
    :type timestamp_timezone: str, optional
    :param database: The database to write to. If not provided, uses the database from initialization.
    :type database: str, optional
    :param kwargs: Additional arguments to pass to the write API.
    :raises TypeError: If df is not a pandas or polars DataFrame.
    :raises InfluxDBError: If there is an error writing to the database.

    Example:
        >>> import pandas as pd
        >>> df = pd.DataFrame({
        ...     'time': pd.to_datetime(['2024-01-01', '2024-01-02']),
        ...     'city': ['London', 'Paris'],
        ...     'temperature': [15.0, 18.0]
        ... })
        >>> client.write_dataframe(
        ...     df,
        ...     measurement='weather',
        ...     timestamp_column='time',
        ...     tags=['city']
        ... )
    """
    if database is None:
        database = self._database

    # Detect DataFrame type
    df_type = str(type(df))
    if 'pandas' not in df_type and 'polars' not in df_type:
        raise TypeError(
            f"Expected a pandas or polars DataFrame, but got {type(df).__name__}. "
            "Please pass a valid DataFrame object."
        )

    try:
        return self._write_api.write(
            bucket=database,
            record=df,
            data_frame_measurement_name=measurement,
            data_frame_tag_columns=tags or [],
            data_frame_timestamp_column=timestamp_column,
            data_frame_timestamp_timezone=timestamp_timezone,
            **kwargs
        )
    except InfluxDBError as e:
        raise e

query(query, language='sql', mode='all', database=None, **kwargs)

Query data from InfluxDB.

If you want to use query parameters, you can pass them as kwargs:

client.query("select * from cpu where host=$host", query_parameters={"host": "server01"})

:param query: The query to execute on the database. :param language: The query language to use. It should be one of "influxql" or "sql". Defaults to "sql". :param mode: The mode to use for the query. It should be one of "all", "pandas", "polars", "chunk", "reader" or "schema". Defaults to "all". :param database: The database to query from. If not provided, uses the database provided during initialization. :param kwargs: Additional arguments to pass to the FlightCallOptions headers. For example, it can be used to set up per request headers. :keyword query_parameters: The query parameters to use in the query. It should be a dictionary of key-value pairs. :return: The query result in the specified mode.

Source code in influxdb_client_3/__init__.py
def query(self, query: str, language: str = "sql", mode: str = "all", database: str = None, **kwargs):
    """Query data from InfluxDB.

    If you want to use query parameters, you can pass them as kwargs:

    >>> client.query("select * from cpu where host=$host", query_parameters={"host": "server01"})

    :param query: The query to execute on the database.
    :param language: The query language to use. It should be one of "influxql" or "sql". Defaults to "sql".
    :param mode: The mode to use for the query. It should be one of "all", "pandas", "polars", "chunk",
                 "reader" or "schema". Defaults to "all".
    :param database: The database to query from. If not provided, uses the database provided during initialization.
    :param kwargs: Additional arguments to pass to the ``FlightCallOptions headers``. For example, it can be used to
                   set up per request headers.
    :keyword query_parameters: The query parameters to use in the query.
                               It should be a ``dictionary`` of key-value pairs.
    :return: The query result in the specified mode.
    """
    if mode == "polars" and polars is False:
        raise ImportError("Polars is not installed. Please install it with `pip install polars`.")

    if database is None:
        database = self._database

    try:
        return self._query_api.query(query=query, language=language, mode=mode, database=database, **kwargs)
    except ArrowException as e:
        raise InfluxDB3ClientQueryError(f"Error while executing query: {e}")

query_dataframe(query, language='sql', database=None, frame_type='pandas', **kwargs)

Query data from InfluxDB and return as a DataFrame.

This is a convenience method that wraps query() and returns the result directly as a pandas or polars DataFrame.

:param query: The query to execute on the database. :type query: str :param language: The query language to use. Should be "sql" or "influxql". Defaults to "sql". :type language: str :param database: The database to query from. If not provided, uses the database from initialization. :type database: str, optional :param frame_type: The type of DataFrame to return. Either "pandas" or "polars". Defaults to "pandas". :type frame_type: Literal["pandas", "polars"] :param kwargs: Additional arguments to pass to the query API. :keyword query_parameters: Query parameters as a dictionary of key-value pairs. :return: Query result as a pandas or polars DataFrame. :rtype: pandas.DataFrame or polars.DataFrame :raises ImportError: If polars is requested but not installed.

Example

Query and get a pandas DataFrame

df = client.query_dataframe("SELECT * FROM weather WHERE city = 'London'")

Query and get a polars DataFrame

df = client.query_dataframe( ... "SELECT * FROM weather", ... frame_type="polars" ... )

Source code in influxdb_client_3/__init__.py
def query_dataframe(
    self,
    query: str,
    language: str = "sql",
    database: Optional[str] = None,
    frame_type: Literal["pandas", "polars"] = "pandas",
    **kwargs
) -> "pd.DataFrame | pl.DataFrame":
    """
    Query data from InfluxDB and return as a DataFrame.

    This is a convenience method that wraps query() and returns the result
    directly as a pandas or polars DataFrame.

    :param query: The query to execute on the database.
    :type query: str
    :param language: The query language to use. Should be "sql" or "influxql". Defaults to "sql".
    :type language: str
    :param database: The database to query from. If not provided, uses the database from initialization.
    :type database: str, optional
    :param frame_type: The type of DataFrame to return. Either "pandas" or "polars". Defaults to "pandas".
    :type frame_type: Literal["pandas", "polars"]
    :param kwargs: Additional arguments to pass to the query API.
    :keyword query_parameters: Query parameters as a dictionary of key-value pairs.
    :return: Query result as a pandas or polars DataFrame.
    :rtype: pandas.DataFrame or polars.DataFrame
    :raises ImportError: If polars is requested but not installed.

    Example:
        >>> # Query and get a pandas DataFrame
        >>> df = client.query_dataframe("SELECT * FROM weather WHERE city = 'London'")
        >>>
        >>> # Query and get a polars DataFrame
        >>> df = client.query_dataframe(
        ...     "SELECT * FROM weather",
        ...     frame_type="polars"
        ... )
    """
    if frame_type == "polars" and polars is False:
        raise ImportError(
            "Polars is not installed. Please install it with `pip install polars`."
        )

    return self.query(query=query, language=language, mode=frame_type, database=database, **kwargs)

query_async(query, language='sql', mode='all', database=None, **kwargs) async

Query data from InfluxDB asynchronously.

If you want to use query parameters, you can pass them as kwargs:

await client.query_async("select * from cpu where host=$host", query_parameters={"host": "server01"})

:param query: The query to execute on the database. :param language: The query language to use. It should be one of "influxql" or "sql". Defaults to "sql". :param mode: The mode to use for the query. It should be one of "all", "pandas", "polars", "chunk", "reader" or "schema". Defaults to "all". :param database: The database to query from. If not provided, uses the database provided during initialization. :param kwargs: Additional arguments to pass to the FlightCallOptions headers. For example, it can be used to set up per request headers. :keyword query_parameters: The query parameters to use in the query. It should be a dictionary of key-value pairs. :return: The query result in the specified mode.

Source code in influxdb_client_3/__init__.py
async def query_async(self, query: str, language: str = "sql", mode: str = "all", database: str = None, **kwargs):
    """Query data from InfluxDB asynchronously.

    If you want to use query parameters, you can pass them as kwargs:

    >>> await client.query_async("select * from cpu where host=$host", query_parameters={"host": "server01"})

    :param query: The query to execute on the database.
    :param language: The query language to use. It should be one of "influxql" or "sql". Defaults to "sql".
    :param mode: The mode to use for the query. It should be one of "all", "pandas", "polars", "chunk",
                 "reader" or "schema". Defaults to "all".
    :param database: The database to query from. If not provided, uses the database provided during initialization.
    :param kwargs: Additional arguments to pass to the ``FlightCallOptions headers``. For example, it can be used to
                   set up per request headers.
    :keyword query_parameters: The query parameters to use in the query.
                               It should be a ``dictionary`` of key-value pairs.
    :return: The query result in the specified mode.
    """
    if mode == "polars" and polars is False:
        raise ImportError("Polars is not installed. Please install it with `pip install polars`.")

    if database is None:
        database = self._database

    try:
        return await self._query_api.query_async(query=query,
                                                 language=language,
                                                 mode=mode,
                                                 database=database,
                                                 **kwargs)
    except ArrowException as e:
        raise InfluxDB3ClientQueryError(f"Error while executing query: {e}")

get_server_version()

Get the version of the connected InfluxDB server.

This method makes a ping request to the server and extracts the version information from either the response headers or response body.

:return: The version string of the InfluxDB server. :rtype: str

Source code in influxdb_client_3/__init__.py
def get_server_version(self) -> str:
    """
    Get the version of the connected InfluxDB server.

    This method makes a ping request to the server and extracts the version information
    from either the response headers or response body.

    :return: The version string of the InfluxDB server.
    :rtype: str
    """
    version = None
    (resp_body, _, header) = self._client.api_client.call_api(
        resource_path="/ping",
        method="GET",
        response_type=object
    )

    for key, value in header.items():
        if key.lower() == "x-influxdb-version":
            version = value
            break

    if version is None and isinstance(resp_body, dict):
        version = resp_body['version']

    return version

flush()

Flush any buffered writes to InfluxDB without closing the client.

This method immediately sends all buffered data points to the server when using batching write mode. After flushing, the client remains open and ready for more writes.

For synchronous write mode, this is a no-op since data is written immediately.

Source code in influxdb_client_3/__init__.py
def flush(self):
    """
    Flush any buffered writes to InfluxDB without closing the client.

    This method immediately sends all buffered data points to the server
    when using batching write mode. After flushing, the client remains
    open and ready for more writes.

    For synchronous write mode, this is a no-op since data is written
    immediately.
    """
    self._write_api.flush()

close()

Close the client and clean up resources.

Source code in influxdb_client_3/__init__.py
def close(self):
    """Close the client and clean up resources."""
    self._write_api.close()
    self._query_api.close()
    self._client.close()