summaryrefslogtreecommitdiffstats
path: root/nova/compute
diff options
context:
space:
mode:
authorBrian Elliott <brian.elliott@rackspace.com>2012-06-21 04:25:24 +0000
committerBrian Elliott <brian.elliott@rackspace.com>2012-08-20 15:46:53 +0000
commitc7d812a35bf4ef42907366c3f674fd623cd46905 (patch)
treecc0cb8ac745d1fae51dafe7c24f499d5b0b27ea8 /nova/compute
parent740e93aae891d6c20f38b091ad9f54d71db0d7f7 (diff)
Keep the ComputeNode model updated with usage
Keep the compute host's ComputeNode model in sync with the level of resource usage. This enables the ComputeNode model to be used as a basis for scheduling decisions rather than forcing scheduler to calculate free resources from an instance_get_all on each request. Resources like memory and disk are claimed as instances are built or deleted. There is also support for configurable compute node stats (a generic key/value store) for extensible advertising of other usage stats that may be useful for a particular scheduler implementation. Additionally, there is a periodic task on the compute host that audits actual resource consumption at the virt layer to ensure that the database stays in sync. This change partially implements blueprint: scheduler-resource-race This patch complements: https://review.openstack.org/#/c/9540/ (build re-scheduling support) Change-Id: Ibbe3839a054f8b80664b413d47f766ca8d68e3f2
Diffstat (limited to 'nova/compute')
-rw-r--r--nova/compute/manager.py31
-rw-r--r--nova/compute/resource_tracker.py450
-rw-r--r--nova/compute/stats.py118
3 files changed, 591 insertions, 8 deletions
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index 6d499f9fb..e45c3333d 100644
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -47,6 +47,7 @@ from nova import block_device
from nova import compute
from nova.compute import instance_types
from nova.compute import power_state
+from nova.compute import resource_tracker
from nova.compute import rpcapi as compute_rpcapi
from nova.compute import task_states
from nova.compute import utils as compute_utils
@@ -278,11 +279,16 @@ class ComputeManager(manager.SchedulerDependentManager):
super(ComputeManager, self).__init__(service_name="compute",
*args, **kwargs)
+ self.resource_tracker = resource_tracker.ResourceTracker(self.host,
+ self.driver)
+
def _instance_update(self, context, instance_uuid, **kwargs):
"""Update an instance in the database using kwargs as value."""
(old_ref, instance_ref) = self.db.instance_update_and_get_original(
context, instance_uuid, kwargs)
+ self.resource_tracker.update_load_stats_for_instance(context, old_ref,
+ instance_ref)
notifications.send_update(context, old_ref, instance_ref)
return instance_ref
@@ -523,10 +529,16 @@ class ComputeManager(manager.SchedulerDependentManager):
network_info = self._allocate_network(context, instance,
requested_networks)
try:
- block_device_info = self._prep_block_device(context, instance)
- instance = self._spawn(context, instance, image_meta,
- network_info, block_device_info,
- injected_files, admin_password)
+ memory_mb_limit = filter_properties.get('memory_mb_limit',
+ None)
+ with self.resource_tracker.instance_resource_claim(context,
+ instance, memory_mb_limit=memory_mb_limit):
+ block_device_info = self._prep_block_device(context,
+ instance)
+ instance = self._spawn(context, instance, image_meta,
+ network_info, block_device_info,
+ injected_files, admin_password)
+
except exception.InstanceNotFound:
raise # the instance got deleted during the spawn
except Exception:
@@ -920,6 +932,8 @@ class ComputeManager(manager.SchedulerDependentManager):
self.db.instance_destroy(context, instance_uuid)
system_meta = self.db.instance_system_metadata_get(context,
instance_uuid)
+ # mark resources free
+ self.resource_tracker.free_resources(context)
self._notify_about_instance_usage(context, instance, "delete.end",
system_metadata=system_meta)
@@ -2902,13 +2916,14 @@ class ComputeManager(manager.SchedulerDependentManager):
@manager.periodic_task
def update_available_resource(self, context):
- """See driver.update_available_resource()
+ """See driver.get_available_resource()
- :param context: security context
- :returns: See driver.update_available_resource()
+ Periodic process that keeps that the compute host's understanding of
+ resource availability and usage in sync with the underlying hypervisor.
+ :param context: security context
"""
- self.driver.update_available_resource(context, self.host)
+ self.resource_tracker.update_available_resource(context)
def _add_instance_fault_from_exc(self, context, instance_uuid, fault,
exc_info=None):
diff --git a/nova/compute/resource_tracker.py b/nova/compute/resource_tracker.py
new file mode 100644
index 000000000..3a43390a0
--- /dev/null
+++ b/nova/compute/resource_tracker.py
@@ -0,0 +1,450 @@
+# Copyright (c) 2012 OpenStack, LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Track resources like memory and disk for a compute host. Provides the
+scheduler with useful information about availability through the ComputeNode
+model.
+"""
+
+from nova import context
+from nova import db
+from nova import exception
+from nova import flags
+from nova.openstack.common import cfg
+from nova.openstack.common import importutils
+from nova.openstack.common import log as logging
+from nova.openstack.common import timeutils
+from nova import utils
+
+resource_tracker_opts = [
+ cfg.IntOpt('claim_timeout_seconds', default=600,
+ help='How long, in seconds, before a resource claim times out'),
+ cfg.StrOpt('compute_stats_class',
+ default='nova.compute.stats.Stats',
+ help='Class that will manage stats for the local compute host')
+]
+
+FLAGS = flags.FLAGS
+FLAGS.register_opts(resource_tracker_opts)
+
+LOG = logging.getLogger(__name__)
+COMPUTE_RESOURCE_SEMAPHORE = "compute_resources"
+
+
+class Claim(object):
+ """A declaration that a compute host operation will require free resources.
+
+ This information will be used to help keep the local compute hosts's
+ ComputeNode model in sync to aid the scheduler in making efficient / more
+ correct decisions with respect to host selection.
+ """
+
+ def __init__(self, claim_id, memory_mb, disk_gb, timeout, *args, **kwargs):
+ self.claim_id = claim_id
+ self.memory_mb = memory_mb
+ self.disk_gb = disk_gb
+ self.timeout = timeout
+ self.expire_ts = timeutils.utcnow_ts() + timeout
+
+ def apply_claim(self, resources):
+ """Adjust the resources required from available resources.
+
+ :param resources: Should be a dictionary-like object that
+ has fields like a compute node
+ """
+ return self._apply(resources)
+
+ def undo_claim(self, resources):
+ return self._apply(resources, sign=-1)
+
+ def is_expired(self):
+ """Determine if this adjustment is old enough that we can assume it's
+ no longer needed.
+ """
+ return timeutils.utcnow_ts() > self.expire_ts
+
+ def _apply(self, resources, sign=1):
+ values = {}
+ values['memory_mb_used'] = (resources['memory_mb_used'] + sign *
+ self.memory_mb)
+ values['free_ram_mb'] = (resources['free_ram_mb'] - sign *
+ self.memory_mb)
+ values['local_gb_used'] = (resources['local_gb_used'] + sign *
+ self.disk_gb)
+ values['free_disk_gb'] = (resources['free_disk_gb'] - sign *
+ self.disk_gb)
+
+ return values
+
+ def __str__(self):
+ return "[Claim %d: %d MB memory, %d GB disk]" % (self.claim_id,
+ self.memory_mb, self.disk_gb)
+
+
+class ResourceContextManager(object):
+ def __init__(self, context, claim, tracker):
+ self.context = context
+ self.claim = claim
+ self.tracker = tracker
+
+ def __enter__(self):
+ if not self.claim and not self.tracker.disabled:
+ # insufficient resources to complete request
+ raise exception.ComputeResourcesUnavailable()
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if not self.claim:
+ return
+
+ if exc_type is None:
+ self.tracker.finish_resource_claim(self.claim)
+ else:
+ self.tracker.abort_resource_claim(self.context, self.claim)
+
+
+class ResourceTracker(object):
+ """Compute helper class for keeping track of resource usage as instances
+ are built and destroyed.
+ """
+
+ def __init__(self, host, driver):
+ self.host = host
+ self.driver = driver
+ self.compute_node = None
+ self.next_claim_id = 1
+ self.claims = {}
+ self.stats = importutils.import_object(FLAGS.compute_stats_class)
+
+ def resource_claim(self, context, *args, **kwargs):
+ claim = self.begin_resource_claim(context, *args, **kwargs)
+ return ResourceContextManager(context, claim, self)
+
+ def instance_resource_claim(self, context, instance_ref, *args, **kwargs):
+ claim = self.begin_instance_resource_claim(context, instance_ref,
+ *args, **kwargs)
+ return ResourceContextManager(context, claim, self)
+
+ @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
+ def begin_instance_resource_claim(self, context, instance_ref, *args,
+ **kwargs):
+ """Method to begin a resource claim for a new instance."""
+ memory_mb = instance_ref['memory_mb']
+ disk_gb = instance_ref['root_gb'] + instance_ref['ephemeral_gb']
+
+ claim = self._do_begin_resource_claim(context, memory_mb, disk_gb,
+ *args, **kwargs)
+ if claim:
+ # also update load stats related to new instances firing up -
+ values = self._create_load_stats(context, instance_ref)
+ self.compute_node = self._update(context, values)
+ return claim
+
+ @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
+ def begin_resource_claim(self, context, memory_mb, disk_gb,
+ memory_mb_limit=None, timeout=None, *args, **kwargs):
+ """Indicate that some resources are needed for an upcoming compute
+ host operation.
+
+ This should be called any time the compute node is about to perform
+ an operation that will consume resources.
+
+ :param memory_mb: security context
+ :param memory_mb: Memory in MB to be claimed
+ :param root_gb: Disk in GB to be claimed
+ :param memory_mb_limit: Memory in MB that is the maximum to allocate on
+ this node. May exceed installed physical memory if
+ oversubscription is the desired behavior.
+ :param timeout: How long, in seconds, the operation that requires
+ these resources should take to actually allocate what
+ it needs from the hypervisor. If the timeout is
+ exceeded, the new resource claim will assume caller
+ before releasing the resources.
+ :returns: An integer 'claim ticket'. This should be turned into
+ finalize a resource claim or free resources after the
+ compute operation is finished. Returns None if the claim
+ failed.
+ """
+
+ return self._do_begin_resource_claim(context, memory_mb, disk_gb,
+ memory_mb_limit, timeout, *args, **kwargs)
+
+ def _do_begin_resource_claim(self, context, memory_mb, disk_gb,
+ memory_mb_limit=None, timeout=None, *args, **kwargs):
+
+ if self.disabled:
+ return
+
+ if not timeout:
+ timeout = FLAGS.claim_timeout_seconds
+
+ memory_mb = abs(memory_mb)
+ disk_gb = abs(disk_gb)
+
+ msg = _("Attempting claim: memory %(memory_mb)d MB, disk %(disk_gb)d "
+ "GB, mem limit %(memory_mb_limit)s") % locals()
+ LOG.audit(msg)
+
+ if not memory_mb_limit:
+ # default to total memory:
+ memory_mb_limit = self.compute_node['memory_mb']
+
+ free_ram_mb = memory_mb_limit - self.compute_node['memory_mb_used']
+
+ # Installed memory and usage info:
+ msg = _("Total memory: %(total_mem)d MB, used: %(used_mem)d MB, free: "
+ "%(free_mem)d") % dict(
+ total_mem=self.compute_node['memory_mb'],
+ used_mem=self.compute_node['memory_mb_used'],
+ free_mem=self.compute_node['local_gb_used'])
+ LOG.audit(msg)
+
+ # Oversubscribed memory policy info:
+ msg = _("Limit: %(memory_mb_limit)d MB, free: %(free_ram_mb)d") % \
+ locals()
+ LOG.audit(msg)
+
+ if memory_mb > free_ram_mb:
+ msg = _("Unable to claim resources. Free memory %(free_ram_mb)d "
+ "MB < requested memory %(memory_mb)d MB") % locals()
+ LOG.info(msg)
+ return None
+
+ if disk_gb > self.compute_node['free_disk_gb']:
+ msg = _("Unable to claim resources. Free disk %(free_disk_gb)d GB"
+ " < requested disk %(disk_gb)d GB") % dict(
+ free_disk_gb=self.compute_node['free_disk_gb'],
+ disk_gb=disk_gb)
+ LOG.info(msg)
+ return None
+
+ claim_id = self._get_next_id()
+ c = Claim(claim_id, memory_mb, disk_gb, timeout, *args, **kwargs)
+
+ # adjust compute node usage values and save so scheduler will see it:
+ values = c.apply_claim(self.compute_node)
+ self.compute_node = self._update(context, values)
+
+ # keep track of this claim until we know whether the compute operation
+ # was successful/completed:
+ self.claims[claim_id] = c
+ return c
+
+ def finish_resource_claim(self, claim):
+ """Indicate that the compute operation that previously claimed the
+ resources identified by 'claim' has now completed and the resources
+ have been allocated at the virt layer.
+
+ Calling this keeps the available resource data more accurate and
+ timely than letting the claim timeout elapse and waiting for
+ update_available_resource to reflect the changed usage data.
+
+ :param claim: A claim indicating a set of resources that were
+ previously claimed.
+ """
+ if self.disabled:
+ return
+
+ if self.claims.pop(claim.claim_id, None):
+ LOG.info(_("Finishing claim: %s") % claim)
+ else:
+ LOG.info(_("Can't find claim %d. It may have been 'finished' "
+ "twice, or it has already timed out."), claim.claim_id)
+
+ @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
+ def abort_resource_claim(self, context, claim):
+ """Indicate that the operation that claimed the resources identified by
+ 'claim_id' has either failed or been aborted and the resources are no
+ longer needed.
+
+ :param claim: A claim ticket indicating a set of resources that were
+ previously claimed.
+ """
+ if self.disabled:
+ return
+
+ # un-claim the resources:
+ if self.claims.pop(claim.claim_id, None):
+ LOG.info(_("Aborting claim: %s") % claim)
+ values = claim.undo_claim(self.compute_node)
+ self.compute_node = self._update(context, values)
+ else:
+ # can't find the claim. this may mean the claim already timed
+ # out or it was already explicitly finished/aborted.
+ LOG.info(_("Claim %d not found. It either timed out or was "
+ "already explicitly finished/aborted"), claim.claim_id)
+
+ @property
+ def disabled(self):
+ return self.compute_node is None
+
+ def free_resources(self, context):
+ """A compute operation finished freeing up resources. Update compute
+ model to reflect updated resource usage.
+
+ (The hypervisor may not immediately 'GC' all resources, so ask directly
+ to see what's available to update the compute node model.)
+ """
+ self.update_available_resource(context.elevated())
+
+ @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
+ def update_available_resource(self, context):
+ """Override in-memory calculations of compute node resource usage based
+ on data audited from the hypervisor layer.
+
+ Add in resource claims in progress to account for operations that have
+ declared a need for resources, but not necessarily retrieved them from
+ the hypervisor layer yet.
+ """
+ # ask hypervisor for its view of resource availability &
+ # usage:
+ resources = self.driver.get_available_resource()
+ if not resources:
+ # The virt driver does not support this function
+ LOG.warn(_("Virt driver does not support "
+ "'get_available_resource' Compute tracking is disabled."))
+ self.compute_node = None
+ self.claims = {}
+ return
+
+ # Confirm resources dictionary contains expected keys:
+ self._verify_resources(resources)
+
+ resources['free_ram_mb'] = resources['memory_mb'] - \
+ resources['memory_mb_used']
+ resources['free_disk_gb'] = resources['local_gb'] - \
+ resources['local_gb_used']
+
+ LOG.audit(_("free_ram_mb: %s") % resources['free_ram_mb'])
+ LOG.audit(_("free_disk_gb: %s") % resources['free_disk_gb'])
+ # Apply resource claims representing in-progress operations to
+ # 'resources'. This may over-estimate the amount of resources in use,
+ # at least until the next time 'update_available_resource' runs.
+ self._apply_claims(resources)
+
+ # also generate all load stats:
+ values = self._create_load_stats(context)
+ resources.update(values)
+
+ if not self.compute_node:
+ # we need a copy of the ComputeNode record:
+ service = self._get_service(context)
+ if not service:
+ # no service record, disable resource
+ return
+
+ compute_node_ref = service['compute_node']
+ if compute_node_ref:
+ self.compute_node = compute_node_ref[0]
+
+ if not self.compute_node:
+ # Need to create the ComputeNode record:
+ resources['service_id'] = service['id']
+ self.compute_node = self._create(context, resources)
+ LOG.info(_('Compute_service record created for %s ') % self.host)
+
+ else:
+ # just update the record:
+ self.compute_node = self._update(context, resources,
+ prune_stats=True)
+ LOG.info(_('Compute_service record updated for %s ') % self.host)
+
+ @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
+ def update_load_stats_for_instance(self, context, old_ref, instance_ref):
+ """Update workload stats for the local compute host."""
+
+ if self.disabled:
+ return
+
+ values = {}
+ self.stats.update_stats_for_instance(old_ref, instance_ref)
+ values['stats'] = self.stats
+
+ values['current_workload'] = self.stats.calculate_workload()
+ values['running_vms'] = self.stats.num_instances
+ values['vcpus_used'] = self.stats.num_vcpus_used
+
+ self.compute_node = self._update(context.elevated(), values)
+
+ def _apply_claims(self, resources):
+ """Apply in-progress resource claims to the 'resources' dict from the
+ virt layer
+ """
+ for claim_id in self.claims.keys():
+ c = self.claims[claim_id]
+ if c.is_expired():
+ # if this claim is expired, just expunge it
+ LOG.info(_("Expiring resource claim %d"), claim_id)
+ self.claims.pop(claim_id)
+ else:
+ values = c.apply_claim(resources)
+ resources.update(values)
+
+ def _create(self, context, values):
+ """Create the compute node in the DB"""
+ # initialize load stats from existing instances:
+ compute_node = db.compute_node_create(context, values)
+ return compute_node
+
+ def _create_load_stats(self, context, instance=None):
+ """For each existing instance generate load stats for the compute
+ node record.
+ """
+ values = {}
+
+ if instance:
+ instances = [instance]
+ else:
+ self.stats.clear() # re-generating all, so clear old stats
+
+ # grab all instances that are not yet DELETED
+ filters = {'host': self.host, 'deleted': False}
+ instances = db.instance_get_all_by_filters(context,
+ {'host': self.host})
+
+ for instance in instances:
+ self.stats.add_stats_for_instance(instance)
+
+ values['current_workload'] = self.stats.calculate_workload()
+ values['running_vms'] = self.stats.num_instances
+ values['vcpus_used'] = self.stats.num_vcpus_used
+ values['stats'] = self.stats
+ return values
+
+ def _get_next_id(self):
+ next_id = self.next_claim_id
+ self.next_claim_id += 1
+ return next_id
+
+ def _get_service(self, context):
+ try:
+ return db.service_get_all_compute_by_host(context,
+ self.host)[0]
+ except exception.NotFound:
+ LOG.warn(_("No service record for host %s"), self.host)
+
+ def _update(self, context, values, prune_stats=False):
+ """Persist the compute node updates to the DB"""
+ return db.compute_node_update(context, self.compute_node['id'],
+ values, prune_stats)
+
+ def _verify_resources(self, resources):
+ resource_keys = ["vcpus", "memory_mb", "local_gb", "cpu_info",
+ "vcpus_used", "memory_mb_used", "local_gb_used"]
+
+ missing_keys = [k for k in resource_keys if k not in resources]
+ if missing_keys:
+ reason = _("Missing keys: %s") % missing_keys
+ raise exception.InvalidInput(reason=reason)
diff --git a/nova/compute/stats.py b/nova/compute/stats.py
new file mode 100644
index 000000000..f67615028
--- /dev/null
+++ b/nova/compute/stats.py
@@ -0,0 +1,118 @@
+# Copyright (c) 2012 OpenStack, LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from nova.compute import task_states
+from nova.compute import vm_states
+from nova import db
+from nova.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+class Stats(dict):
+ """Handler for updates to compute node workload stats."""
+
+ def add_stats_for_instance(self, instance):
+ self._increment("num_task_%s" % instance['task_state'])
+
+ self._increment("num_vm_%s" % instance['vm_state'])
+ self._increment("num_instances")
+
+ os_type = instance['os_type']
+ self._increment("num_os_type_%s" % os_type)
+
+ proj_id = instance['project_id']
+ self._increment("num_proj_%s" % proj_id)
+
+ x = self.get("num_vcpus_used", 0)
+ self["num_vcpus_used"] = x + instance["vcpus"]
+
+ def calculate_io_workload(self):
+ """Calculate an I/O based load by counting I/O heavy operations"""
+
+ def _get(state, state_type):
+ key = "num_%s_%s" % (state_type, state)
+ return self.get(key, 0)
+
+ num_builds = _get(vm_states.BUILDING, "vm")
+ num_migrations = _get(task_states.RESIZE_MIGRATING, "task")
+ num_rebuilds = _get(task_states.REBUILDING, "task")
+ num_resizes = _get(task_states.RESIZE_PREP, "task")
+ num_snapshots = _get(task_states.IMAGE_SNAPSHOT, "task")
+ num_backups = _get(task_states.IMAGE_BACKUP, "task")
+
+ return (num_builds + num_rebuilds + num_resizes + num_migrations +
+ num_snapshots + num_backups)
+
+ def calculate_workload(self):
+ """Calculate current load of the compute host based on
+ task states.
+ """
+ current_workload = 0
+ for k in self:
+ if k.startswith("num_task") and not k.endswith("None"):
+ current_workload += self[k]
+ return current_workload
+
+ @property
+ def num_instances(self):
+ return self.get("num_instances", 0)
+
+ def num_instances_for_project(self, project_id):
+ key = "num_proj_%s" % project_id
+ return self.get(key, 0)
+
+ def num_os_type(self, os_type):
+ key = "num_os_type_%s" % os_type
+ return self.get(key, 0)
+
+ @property
+ def num_vcpus_used(self):
+ return self.get("num_vcpus_used", 0)
+
+ def update_stats_for_instance(self, old_instance, instance):
+ """Update stats after an instance is changed."""
+
+ old_vm_state = old_instance['vm_state']
+ new_vm_state = instance['vm_state']
+
+ if old_vm_state != new_vm_state:
+ self._decrement("num_vm_%s" % old_vm_state)
+ self._increment("num_vm_%s" % new_vm_state)
+
+ if new_vm_state == vm_states.DELETED:
+ self._decrement("num_instances")
+
+ self._decrement("num_os_type_%s" % old_instance['os_type'])
+
+ self._decrement("num_proj_%s" % old_instance["project_id"])
+
+ x = self.get("num_vcpus_used", 0)
+ self["num_vcpus_used"] = x - old_instance['vcpus']
+
+ old_task_state = old_instance['task_state']
+ new_task_state = instance['task_state']
+
+ if old_task_state != new_task_state:
+ self._decrement("num_task_%s" % old_task_state)
+ self._increment("num_task_%s" % new_task_state)
+
+ def _decrement(self, key):
+ x = self.get(key, 0)
+ self[key] = x - 1
+
+ def _increment(self, key):
+ x = self.get(key, 0)
+ self[key] = x + 1