summaryrefslogtreecommitdiffstats
path: root/ldap/servers/slapd/index_subsystem.c
blob: 890e6c857c120594819efdc84beec099371008a5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
/** BEGIN COPYRIGHT BLOCK
 * This Program is free software; you can redistribute it and/or modify it under
 * the terms of the GNU General Public License as published by the Free Software
 * Foundation; version 2 of the License.
 * 
 * This Program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License along with
 * this Program; if not, write to the Free Software Foundation, Inc., 59 Temple
 * Place, Suite 330, Boston, MA 02111-1307 USA.
 * 
 * In addition, as a special exception, Red Hat, Inc. gives You the additional
 * right to link the code of this Program with code not covered under the GNU
 * General Public License ("Non-GPL Code") and to distribute linked combinations
 * including the two, subject to the limitations in this paragraph. Non-GPL Code
 * permitted under this exception must only link to the code of this Program
 * through those well defined interfaces identified in the file named EXCEPTION
 * found in the source code files (the "Approved Interfaces"). The files of
 * Non-GPL Code may instantiate templates or use macros or inline functions from
 * the Approved Interfaces without causing the resulting work to be covered by
 * the GNU General Public License. Only Red Hat, Inc. may make changes or
 * additions to the list of Approved Interfaces. You must obey the GNU General
 * Public License in all respects for all of the Program code and other code used
 * in conjunction with the Program except the Non-GPL Code covered by this
 * exception. If you modify this file, you may extend this exception to your
 * version of the file, but you are not obligated to do so. If you do not wish to
 * provide this exception without modification, you must delete this exception
 * statement from your version and license this file solely under the GPL without
 * exception. 
 * 
 * 
 * Copyright (C) 2005 Red Hat, Inc.
 * All rights reserved.
 * END COPYRIGHT BLOCK **/

#ifdef HAVE_CONFIG_H
#  include <config.h>
#endif


/* The indexing subsystem
 * ----------------------
 *
 * This provides support for indexing plugins and assigning
 * those plugins to sub-filters of a search filter.  Currently
 * the old indexing code still exists and operates on those
 * indexes which do not have a plugin assigned.  This indexing
 * abstraction is intended to eventually decouple the index mechanics
 * from the back-end where that is possible.  Hopefully, while
 * supporting the needs of virtual attribute indexes, it will allow
 * easier integration of other back ends.
 *
 */

/* includes */
#include "slap.h"
#include "./back-ldbm/back-ldbm.h"
#include "./back-ldbm/idlapi.h"
#include "index_subsys.h"

#define INDEX_IDLIST_INITIAL_SIZE	128		/* make this a good size to avoid constant reallocs */

/* data */
static void **idl_api;

struct _indexLinkedList
{
	void *pNext;
	void *pPrev;
};
typedef struct _indexLinkedList indexLinkedList;

struct _indexEntry
{
	indexLinkedList list;
	char *indexedAttribute;
	Slapi_Filter *indexfilter;
	char *indexfilterstr;
	char **associated_attrs;
	void *user_data;
	Slapi_DN *namespace_dn;
    index_search_callback lookup_func; /* search call back */
};
typedef struct _indexEntry indexEntry;

struct _indexPlugin
{
	indexLinkedList list;
	char *id;
	indexEntry *indexes;
	index_validate_callback validate_op;
};
typedef struct _indexPlugin indexPlugin;

struct _globalIndexCache
{
	indexPlugin *pPlugins;
	indexEntry **ppIndexIndex; /* sorted list with key: indexEntry.indexedAttribute */
	int index_count;
	PRRWLock *cache_lock;
};
typedef struct _globalIndexCache globalIndexCache;

static globalIndexCache *theCache = 0;

/* prototypes */
static int index_subsys_decoder_done(Slapi_Filter *f);
static int index_subsys_assign_decoders(Slapi_Filter *f);
static int index_subsys_assign_decoder(Slapi_Filter *f);
static int index_subsys_group_decoders(Slapi_Filter *f);
static int index_subsys_unlink_subfilter(Slapi_Filter *fcomplex, Slapi_Filter *fsub);
static int index_subsys_index_matches_associated(indexEntry *index, Slapi_Filter *f);

/* matching alg - note : values 0/1/2/3 supported right now*/
#define INDEX_MATCH_NONE		0
#define INDEX_MATCH_EQUALITY	1
#define INDEX_MATCH_PRESENCE	2
#define INDEX_MATCH_SUBSTRING	3
#define INDEX_MATCH_COMPLEX		4
static int index_subsys_index_matches_filter(indexEntry *index, Slapi_Filter *f);

static void index_subsys_read_lock()
{
	PR_RWLock_Rlock(theCache->cache_lock);
}

static void index_subsys_write_lock()
{
	PR_RWLock_Wlock(theCache->cache_lock);
}

static void index_subsys_unlock()
{
	PR_RWLock_Unlock(theCache->cache_lock);
}

int slapi_index_entry_list_create(IndexEntryList **list)
{
	if(idl_api)
		*list = (IndexEntryList*)IDList_alloc(idl_api, INDEX_IDLIST_INITIAL_SIZE);
	else
		*list = 0;

	return !(*list);
}

int slapi_index_entry_list_add(IndexEntryList **list, IndexEntryID id)
{
	if(idl_api)
		IDList_insert(idl_api, (IDList **)list, (ID)id);

	return 0;  /* no way to tell failure */
}


static int index_subsys_index_matches_filter(indexEntry *index, Slapi_Filter *f)
{
	int ret = INDEX_MATCH_NONE;

	/* simple filters only right now */
	if(slapi_attr_types_equivalent(index->indexedAttribute, f->f_type))
	{
		/* ok we have some type of match, lets find out which */

		switch(index->indexfilter->f_choice)
		{
		case LDAP_FILTER_PRESENT:
			/* present means "x=*" */
			if(f->f_choice == LDAP_FILTER_PRESENT)
				ret = INDEX_MATCH_PRESENCE;
			break;
	    case LDAP_FILTER_SUBSTRINGS:
			/* our equality filters look like this "x=**"
			 * that means the filter will be assigned
			 * a substring f_choice by the filter code
			 * in str2filter.c
			 * but we need to differentiate so we take
			 * advantage of the fact that this creates a
			 * special substring filter with no substring
			 * to look for...
			 */
			if(	index->indexfilter->f_sub_initial == 0 &&
				index->indexfilter->f_sub_any == 0 &&
				index->indexfilter->f_sub_final == 0
				)
			{
				/* this is an index equality filter */
				if(f->f_choice == LDAP_FILTER_EQUALITY)
					ret = INDEX_MATCH_EQUALITY;
			}
			else
			{
				/* this is a regualar substring filter */
				if(f->f_choice == LDAP_FILTER_SUBSTRINGS)
					ret = INDEX_MATCH_SUBSTRING;
			}
			
			break;

		default:
			/* we don't know about any other type yet */
			break;
		}
	}

	return ret;
}

/* index_subsys_assign_filter_decoders
 * -----------------------------------
 * assigns index plugins to sub-filters
 */
int index_subsys_assign_filter_decoders(Slapi_PBlock *pb)
{
	int				rc;
	Slapi_Filter	*f;
	char			*subsystem = "index_subsys_assign_filter_decoders";
	char			logbuf[ 1024 ];

	/* extract the filter */
	slapi_pblock_get(pb, SLAPI_SEARCH_FILTER, &f);   

	if ( LDAPDebugLevelIsSet( LDAP_DEBUG_FILTER ) && NULL != f ) {
		logbuf[0] = '\0';
		slapi_log_error( SLAPI_LOG_FATAL, subsystem, "before: %s\n",
				slapi_filter_to_string(f, logbuf, sizeof(logbuf)));
	}

	/* find decoders */
	rc = index_subsys_assign_decoders(f);

	if ( LDAPDebugLevelIsSet( LDAP_DEBUG_FILTER ) && NULL != f ) {
		logbuf[0] = '\0';
		slapi_log_error( SLAPI_LOG_FATAL, subsystem, " after: %s\n",
				slapi_filter_to_string(f, logbuf, sizeof(logbuf)));
	}

	return rc;
}

/* index_subsys_filter_decoders_done
 * ---------------------------------
 * removes assigned index plugins in
 * sub-filters
 */
int index_subsys_filter_decoders_done(Slapi_PBlock *pb)
{
	Slapi_Filter *f;

	/* extract the filter */
	slapi_pblock_get(pb, SLAPI_SEARCH_FILTER, &f);   

	/* remove decoders */
	return index_subsys_decoder_done(f);
}


/* index_subsys_unlink_subfilter
 * -----------------------------
 * removes the sub-filter from
 * the complex filter list
 * does NOT deallocate the sub-filter
 */
static int index_subsys_unlink_subfilter(Slapi_Filter *fcomplex, Slapi_Filter *fsub)
{
	int ret = -1;
	Slapi_Filter *f;
	Slapi_Filter *f_prev = 0;

	for(f=fcomplex->f_list; f != NULL; f = f->f_next)
	{
		if(f == fsub)
		{
			if(f_prev)
			{
				f_prev->f_next = f->f_next;
				f->f_next = 0;
				ret = 0;
				break;
			}
			else
			{
				/* was at the beginning of the list */
				fcomplex->f_list = f->f_next;
				f->f_next = 0;
				ret = 0;
				break;
			}
		}

		f_prev = f;
	}

	return ret;
}

/* index_subsys_index_matches_associated
 * -------------------------------------
 * determines if there is any kind of match
 * between the specified type and the index.
 *
 * matches could be on the indexed type or
 * on any associated attribute
 * returns:
 * 0 when false
 * non-zero when true
 */
static int index_subsys_index_matches_associated(indexEntry *index, Slapi_Filter *f)
{
	int ret = 0;
	char **associated_attrs = index->associated_attrs;

	if(INDEX_MATCH_NONE != index_subsys_index_matches_filter(index, f))
	{
		/* matched on indexed attribute */
		ret = -1;
		goto bail;
	}

	/* check associated attributes */
	if(associated_attrs)
	{
		int i;
		char *type = f->f_type;

		for(i=0; associated_attrs[i]; i++)
		{
			if(slapi_attr_types_equivalent(associated_attrs[i], type))
			{
				/* matched on associated attribute */
				ret = -1;
				break;
			}
		}
	}

bail:
	return ret;
}


/* index_subsys_flatten_filter
 * ---------------------------
 * takes a complex filter as argument (assumed)
 * and merges all compatible complex sub-filters
 * such that their list of sub-filters are moved
 * to the main list of sub-filters in f.
 *
 * This "flattens" the filter so that there are
 * the minimum number of nested complex filters
 * possible.
 *
 * What is a "compatible complex sub-filter?"
 * Answer: a complex sub-filter which is of the
 * same type (AND or OR) as the containing complex
 * filter and which is either assigned the same
 * index decoder or no index decoder is assigned to
 * either complex filter.
 *  
 * This function assumes that it has already
 * been called for every complex sub-filter of f
 * i.e. it only looks one layer deep.
 *
 * Note that once a filter has been processed in
 * this fashion, rearranging the filter based
 * on the optimal evaluation order becomes very
 * much simpler.  It should also have benefits for
 * performance when a filter is evaluated many
 * times since a linear list traversal is faster than a
 * context switch to recurse down into a complex
 * filter structure.
 *
 */
static void index_subsys_flatten_filter(Slapi_Filter *flist)
{
	struct slapi_filter *f = flist->f_list;
	struct slapi_filter *fprev = 0;
	struct slapi_filter *flast = 0;

	while(f)
	{
		if(f->assigned_decoder == flist->assigned_decoder)
		{
			/* mmmk, but is the filter complex? */
			if(f->f_choice == LDAP_FILTER_AND || f->f_choice == LDAP_FILTER_OR)
			{
				if(f->f_choice == flist->f_choice)
				{
					/* flatten this, and remember
					 * we expect that any complex sub-filters
					 * have already been flattened, so we
					 * simply transfer the contents of this
					 * sub-filter to the main sub-filter and
					 * remove this complex sub-filter
					 *
					 * take care not to change the filter
					 * ordering in any way (it may have been
					 * optimized)
					 */
					struct slapi_filter *fnext = f->f_next;
					struct slapi_filter *fsub = 0;

					while(f->f_list)
					{
						fsub = f->f_list;
						index_subsys_unlink_subfilter(f, f->f_list);
						fsub->f_next = fnext;

						if(flast)
						{
							/* we inserted something before - insert after */
							flast->f_next = fsub;
						}
						else
						{
							/* first insertion */
							if(fprev)
							{
								fprev->f_next = fsub;
							}
							else
							{
								/* insert at list start */
								flist->f_list = fsub;
							}
							
							fprev = fsub;
						}

						flast = fsub;
					}

					/* zero for dont recurse - recursing would
					 * be bad since we have created a mutant
					 * complex filter with no children
					 */
					slapi_filter_free(f, 0);
					f = fnext;
				}
				else
				{
					fprev = f;
					f = f->f_next;
				}
			}
			else
			{
				fprev = f;
				f = f->f_next;
			}
		}
		else
		{
			fprev = f;
			f = f->f_next;
		}
	}
}

/* index_subsys_group_decoders
 * ---------------------------
 * looks for grouping opportunities
 * such that a complex filter may
 * be assigned to a single index.
 *
 * it is assumed that any complex sub-filters
 * have already been assigned decoders 
 * using this function if it
 * was possible to do so
 */
static int index_subsys_group_decoders(Slapi_Filter *flist)
{
	int ret = 0;
	struct slapi_filter	*f = 0;
	struct slapi_filter *f_indexed = 0;
	struct slapi_filter	*fhead = 0;
	int index_count = 0;
	int matched = 1;

	switch(flist->f_choice)
	{
	case LDAP_FILTER_AND:
	case LDAP_FILTER_OR:
		break;

	default:
		/* any other result not handled by this code */
		goto bail;
	}

	/* make sure we start with an unassigned filter */
	flist->assigned_decoder = 0;

	/* Since this function is about optimal grouping of complex filters,
	 * lets explain what is happening here:
	 * 
	 * Beyond detecting that what was passed in is already optimal,
	 * there are 4 basic problems we need to solve here:
	 *
	 *          Input                     this function           Output
	 * 1) (&(indexed)(other)(associated)) -> X -> (&(&(indexed)(associated))(other))
	 * 2) (&(&(indexed)(other))(associated)) -> X -> (&(&(indexed)(associated))(other))
	 * 3) (&(&(associated)(other))(indexed)) -> X -> (&(&(indexed)(associated))(other))
	 * 4) (&(&(indexed)(associated))(associated)) -> X -> (&(indexed)(associated)(associated))
	 *
	 * To avoid having special code for 2) and 3) we make them look like 
	 * 1) by flattening the filter - note this will only flatten subfilters
	 * which have no decoder assigned since the filter we flatten has no
	 * decoder assigned - and this behaviour is exactly what we want.
	 * 4) is a special case of 1) and since that is the case, we can allow
	 * the code for 1) to process it but make sure we flatten the filter
	 * before exit.  If 4) is exactly as stated without any other non-indexed
	 * or associated references then in fact it will be detected as a completely
	 * matching filter prior to reaching the code for 1).
	 */

	index_subsys_flatten_filter(flist);
	fhead = flist->f_list;

	/* find the first index assigned */
	for ( f_indexed = fhead; f_indexed != NULL; f_indexed = f_indexed->f_next )
	{
		if( f_indexed->assigned_decoder )
		{
			/* non-null decoder means assigned */
			break;
		}
	}

	if(NULL == f_indexed)
		/* nothing to process */
		goto bail;

	/* determine if whole filter matches
	 * to avoid allocations where it is
	 * not necessary
	 */
	for ( f = fhead; f != NULL; f = f->f_next )
	{
		if(f->assigned_decoder != f_indexed->assigned_decoder)
		{
			switch(f->f_choice)
			{
			case LDAP_FILTER_AND:
			case LDAP_FILTER_OR:
				/*
				 * All complex subfilters are guaranteed to have the correct
				 * decoder assigned already, so this is a mismatch.
				 */

				matched = 0;
				break;

			default:
				if(!index_subsys_index_matches_associated(f_indexed->assigned_decoder, f))
				{
					matched = 0;
				}
				break;
			}

			if(!matched)
				break;
		}
	}

	if(matched)
	{
		/* whole filter matches - assign to this decoder */
		flist->assigned_decoder = f_indexed->assigned_decoder;
		/* finally lets flatten this filter if possible
		 * Didn't we do that already?  No, we flattened the
		 * filter *prior* to assigning a decoder
		*/
		index_subsys_flatten_filter(flist);
		goto bail;
	}

	/* whole filter does not match so,
	 * if the sub-filter count is > 2
	 * for each assigned sub-filter,
	 * match other groupable filters
	 * and extract them into another sub-filter
	 */

	/* count */
	for ( f = fhead; f != NULL && index_count < 3; f = f->f_next )
	{
		index_count++;
	}

	if(index_count > 2)
	{
		/* this is where we start re-arranging the filter assertions
		 * into groups which can be serviced by a single plugin
		 * at this point we know that:
		 * a) the filter has at least 2 assertions that cannot be grouped
		 * b) there are more than 2 assertions and so grouping is still possible
		 */

		struct slapi_filter *f_listposition=f_indexed; /* flist subfilter list iterator */
		int main_iterate; /* controls whether to iterate to the next sub-filter of flist */
		
		while(f_listposition)
		{
			/* the target grouping container - we move sub-filters here */
			struct slapi_filter *targetf=0; 

			/* indicates we found an existing targetf */
			int assigned = 0; 

			struct slapi_filter *f_last = 0; /* previos filter in list */

			/* something to join with next compatible
			 * subfilter we find - this will be the 
			 * first occurence of a filter assigned
			 * to a particular decoder
			 */
			struct slapi_filter	*saved_filter = 0;

			struct slapi_filter	*f_tmp = 0; /* save filter for list fixups */

			/* controls whether to iterate to the
			 * next sub-filter of flist 
			 * inner loop
			 */
			int iterate = 1;

			f_indexed = f_listposition;
			main_iterate = 1;

			/* finding a convenient existing sub-filter of the same
			 * type as the containing filter avoids allocation
			 * so lets look for one
			 */

			for ( f = fhead; f != NULL; f = f->f_next)
			{
				switch(f->f_choice)
				{
				case LDAP_FILTER_AND:
				case LDAP_FILTER_OR:
					if( f->f_choice == flist->f_choice &&
						f->assigned_decoder == f_indexed->assigned_decoder)
					{
						targetf = f;
						assigned = 1;
					}
					break;

				default:
					break;
				}

				if(assigned)
					break;
			}

			/* now look for grouping opportunities */
			for ( f = fhead; f != NULL; (iterate && f) ? f = f->f_next : f )
			{
				iterate = 1;

				if(f != targetf)
				{
					switch(f->f_choice)
					{
					case LDAP_FILTER_AND:
					case LDAP_FILTER_OR:
						if( (targetf && f->f_choice == targetf->f_choice) 
							&& f->assigned_decoder == f_indexed->assigned_decoder)
						{
							/* ok we have a complex filter we can group - group it
							 * it is worth noting that if we got here, then we must
							 * have found a complex filter suitable for for putting
							 * sub-filters in, so there is no need to add the newly
							 * merged complex filter to the main complex filter, 
							 * since it is already there
							 */
							
							/* main filter list fix ups */
							f_tmp = f;
							f = f->f_next;
							iterate = 0;

							if(f_tmp == f_listposition)
							{
								f_listposition = f;
								main_iterate = 0;
							}

							/* remove f from the main complex filter */
							index_subsys_unlink_subfilter(flist, f_tmp);


							/* merge - note, not true merge since f gets
							 * added to targetf as a sub-filter
							 */
							slapi_filter_join_ex(targetf->f_choice, targetf, f_tmp, 0);	
							
							/* since it was not a true merge, lets do the true merge now */
							index_subsys_flatten_filter(targetf);
						}
						break;

					default:
						if(index_subsys_index_matches_associated(f_indexed->assigned_decoder, f))
						{
							if(targetf)
							{
								/* main filter list fix ups */
								f_tmp = f;
								f = f->f_next;
								iterate = 0;
								
								if(f_tmp == f_listposition)
								{
									f_listposition = f;
									main_iterate = 0;
								}

								index_subsys_unlink_subfilter(flist, f_tmp);
								targetf = slapi_filter_join_ex( targetf->f_choice, targetf, f_tmp, 0 );
							}
							else
							{
								if(saved_filter)
								{
									/* main filter list fix ups */
									f_tmp = f;
									f = f->f_next;
									iterate = 0;

									if(f_tmp == f_listposition)
									{
										f_listposition = f;
										main_iterate = 0;
									}

									index_subsys_unlink_subfilter(flist, f_tmp);
									index_subsys_unlink_subfilter(flist, saved_filter);
									targetf = slapi_filter_join_ex( flist->f_choice, saved_filter, f_tmp, 0 );
									targetf->assigned_decoder = f_indexed->assigned_decoder;
								}
								else
								{
									/* nothing to join so save this for
									 * when we find another compatible
									 * filter
									 */
									saved_filter = f;
								}
							}
							
							if(!assigned && targetf)
							{
								/* targetf has just been created, so
								 * we must add it to the main complex filter
								 */
								targetf->f_next = flist->f_list;
								flist->f_list = targetf;
								assigned = 1;
							}
						}

						break;
					}
				}

				f_last = f;
			}

			/* iterate through the main list 
			 * to the next indexed sub-filter
			 */
			if(	f_listposition && 
					(main_iterate ||
						(!main_iterate && 
						!f_listposition->assigned_decoder)))
			{
				if(!f_listposition->f_next)
				{
					f_listposition = 0;
					break;
				}

				for ( f_listposition = f_listposition->f_next; f_listposition != NULL; f_listposition = f_listposition->f_next )
				{
					if( f_listposition->assigned_decoder )
					{
						/* non-null decoder means assigned */
						break;
					}
				}
			}
		}
	}

bail:

	return ret;
}


/* index_subsys_assign_decoders
 * ----------------------------
 * recurses through complex filters
 * assigning decoders
 */
static int index_subsys_assign_decoders(Slapi_Filter *f)
{
	int ret = 0;
	Slapi_Filter *subf;

	switch ( slapi_filter_get_choice( f ) ) 
	{
	case LDAP_FILTER_AND:
	case LDAP_FILTER_OR:
	case LDAP_FILTER_NOT:
		/* assign simple filters first */
		f->assigned_decoder = 0;
		for(subf=f->f_list; subf != NULL; subf = subf->f_next )
			ret = index_subsys_assign_decoders(subf);

		/* now check for filter grouping opportunities... */
		if(slapi_filter_get_choice( f ) != LDAP_FILTER_NOT)
			index_subsys_group_decoders(f);
		else
		{
			/* LDAP_FILTER_NOT is a special case
			 * the contained sub-filter determines
			 * the assigned index - the index plugin has
			 * the option to refuse to service the
			 * NOT filter when it is presented
			 */
			f->assigned_decoder = f->f_list->assigned_decoder;
		}

		break;

	default:
		/* find a decoder that fits this simple filter */
		ret = index_subsys_assign_decoder(f);
	}

	return ret;
}

/* index_subsys_decoder_done
 * -------------------------
 * recurses through complex filters
 * removing decoders
 */
static int index_subsys_decoder_done(Slapi_Filter *f)
{
	int ret = 0;
	Slapi_Filter *subf;
	indexEntry *index;
	indexEntry *next_index;
	
	switch ( slapi_filter_get_choice( f ) ) 
	{
	case LDAP_FILTER_AND:
	case LDAP_FILTER_OR:
	case LDAP_FILTER_NOT:
		/* remove simple filters first */
		for(subf=f->f_list; subf != NULL; subf = subf->f_next )
			ret = index_subsys_decoder_done(subf);

		break;

	default:
		/* free the decoders - shallow free */
		index = f->assigned_decoder;

		while(index)
		{
			next_index = index->list.pNext;
			slapi_ch_free((void**)index);
			index = next_index;
		}

		f->assigned_decoder = 0;
	}

	return ret;
}

/* index_subsys_evaluate_filter
 * ----------------------------
 * passes the filter to the correct plugin
 * index_subsys_assign_filter_decoders() must
 * have been called previously on this filter
 * this function can be safely called on all
 * filters post index_subsys_assign_filter_decoders()
 * whether they are assigned to a plugin or not
 * 
 * returns:
 * INDEX_FILTER_EVALUTED: a candidate list is produced
 * INDEX_FILTER_UNEVALUATED: filter not considered
 */
int index_subsys_evaluate_filter(Slapi_Filter *f, Slapi_DN *namespace_dn, IndexEntryList **out)
{
	int ret = INDEX_FILTER_UNEVALUATED;
	indexEntry *index = (indexEntry*)f->assigned_decoder;

	if(index && theCache)
	{
		index_subsys_read_lock();

		/* there is a list of indexes which may
		 * provide an answer for this filter, we
		 * need to invoke the first one that matches
		 * the namespace requested
		 */
		for(; index; index = index->list.pNext)
		{
			/* check namespace */
			if(slapi_sdn_compare(index->namespace_dn, namespace_dn))
			{
				/* wrong namespace */
				continue;
			}

			/* execute the search */
			if(index->lookup_func)
			{
				ret = (index->lookup_func)(f, out, index->user_data);
				break;
			}
		}

		index_subsys_unlock();
	}

	return ret;
}


/* slapi_index_register_decoder
 * ----------------------------
 * This allows a decoder to register itself,
 * it also builds the initial cache when first
 * called.  Note, there is no way to deregister
 * once registered - this allows a lock free cache
 * at the expense of a restart to clear old
 * indexes, this shouldnt be a problem since it is
 * not expected that indexes will be removed
 * often.
 */
int slapi_index_register_decoder(char *plugin_id, index_validate_callback validate_op)
{
	static int firstTime = 1;
	static int gotIDLapi = 0;
	int ret = 0;
	indexPlugin *plg;

	if(firstTime)
	{
		/* create the cache */
		theCache = (globalIndexCache*)slapi_ch_malloc(sizeof(globalIndexCache));
		if(theCache)
		{
			theCache->pPlugins = 0;
			theCache->ppIndexIndex = 0;
			theCache->index_count = 0;
			theCache->cache_lock = PR_NewRWLock(PR_RWLOCK_RANK_NONE, "Index Plugins");;
			firstTime = 0;

			if(!gotIDLapi)
			{
				if(slapi_apib_get_interface(IDL_v1_0_GUID, &idl_api))
				{
					gotIDLapi = 1;
				}
			}
		}
		else
		{
			ret = -1;
			goto bail;
		}
	}

	index_subsys_write_lock();

	/* add the index decoder to the cache - no checks, better register once only*/
	plg = (indexPlugin*)slapi_ch_malloc(sizeof(indexPlugin));
	if(plg)
	{
		plg->id = slapi_ch_strdup(plugin_id);
		plg->indexes = 0;
		plg->validate_op = validate_op;

		/* we always add to the start of the linked list */
		plg->list.pPrev = 0;

		if(theCache->pPlugins)
		{
			plg->list.pNext = theCache->pPlugins;
			theCache->pPlugins->list.pPrev = plg;
		}
		else
			plg->list.pNext = 0;


		theCache->pPlugins = plg;
	}
	else
		ret = -1;

	index_subsys_unlock();

bail:
	return ret;
}


/* slapi_index_register_index
 * --------------------------
 * a plugin that has registered itself may
 * then proceed to register individual indexes
 * that it looks after.  This function adds
 * the indexes to the plugin cache.
 */
int slapi_index_register_index(char *plugin_id, indexed_item *registration_item, void *user_data)
{
	int ret = 0;
	indexPlugin *plg;
	indexEntry *index;
	int a_matched_index = 0;
	Slapi_Filter *tmp_f = slapi_str2filter(registration_item->index_filter);
	Slapi_Backend *be;
	
	if(!theCache)
		return -1;

	index_subsys_write_lock();

	/* first lets find the plugin */
	plg = theCache->pPlugins;

	while(plg)
	{
		if(!slapi_UTF8CASECMP(plugin_id, plg->id))
		{
			/* found plugin */
			break;
		}

		plg = plg->list.pNext;
	}

	if(0 == plg)
	{
		/* not found */
		ret = -1;
		goto bail;
	}

	/* now add the new index - we shall assume indexes 
	 * will not be registered twice by different plugins,
	 * in that event, the last one added wins
	 * the first matching index in the list is always
	 * the current one, other matching indexes are ignored
	 * therefore reregistering an index with NULL
	 * callbacks disables the index for that plugin
	 */

	/* find the index if already registered */

	index = plg->indexes;

	while(index)
	{
		if(index_subsys_index_matches_filter(index, tmp_f))
		{
			/* found it - free what is currently there, it will be replaced */
			slapi_ch_free((void**)&index->indexfilterstr);
			slapi_filter_free(index->indexfilter, 1);
			slapi_ch_free((void**)&index->indexedAttribute);
			
			charray_free( index->associated_attrs );
			index->associated_attrs = 0;

			a_matched_index = 1;
			break;
		}

		index = index->list.pNext;
	}

	if(!index)
		index = (indexEntry*)slapi_ch_calloc(1,sizeof(indexEntry));

	index->indexfilterstr = slapi_ch_strdup(registration_item->index_filter);
	index->indexfilter = tmp_f;
	index->lookup_func = registration_item->search_op;
	index->user_data = user_data;

	/* map the namespace dn to a backend dn */
	be = slapi_be_select( registration_item->namespace_dn );
	
	if(be == defbackend_get_backend())
	{
		ret = -1;
		goto bail;
	}

	index->namespace_dn = (Slapi_DN*)slapi_be_getsuffix(be, 0);
	
	/* for now, we support simple filters only */
	index->indexedAttribute = slapi_ch_strdup(index->indexfilter->f_type);

	/* add associated attributes */
	if(registration_item->associated_attrs)
	{
		index->associated_attrs =
			cool_charray_dup( registration_item->associated_attrs );
	}

	if(!a_matched_index)
	{
		if(plg->indexes)
		{
			index->list.pNext = plg->indexes;
			plg->indexes->list.pPrev = plg;
		}
		else
			index->list.pNext = 0;

		index->list.pPrev = 0;

		plg->indexes = index;

		theCache->index_count++;
	}

	/* now we need to rebuild the index (onto the indexed items)
	 * this is a bit inefficient since
	 * every new index that is added triggers
	 * an index rebuild - but this is countered
	 * by the fact this will probably happen once
	 * at start up most of the time, and very rarely
	 * otherwise, so normal server performance should
	 * not be unduly effected effected
	 * we take care to build the index and only then swap it in
	 * for the old one
	 * PARPAR: we need to RW (or maybe a ref count) lock here
	 * only alternative would be to not have an index :(
	 * for few plugins with few indexes thats a possibility
	 * traditionally many indexes have not been a good idea
	 * anyway so...
	 */
	
/*	indexIndex = (indexEntry**)slapi_ch_malloc(sizeof(indexEntry*) * (theCache->index_count+1));
*/
	/* for now, lets see how fast things are without an index
	 * that should not be a problem at all to begin with since
	 * presence will be the only index decoder.  Additionally,
	 * adding an index means we need locks - um, no.
	 * so no more to do
	 */

bail:
	index_subsys_unlock();

	return ret;
}

/* index_subsys_index_matches_index
 * --------------------------------
 * criteria for a match is that the types
 * are the same and that all the associated
 * attributes that are configured for cmp_to
 * are also in cmp_with.
 */
int index_subsys_index_matches_index(indexEntry *cmp_to, indexEntry *cmp_with)
{
	int ret = 0;

	if(slapi_attr_types_equivalent(cmp_to->indexedAttribute, cmp_with->indexedAttribute))
	{
		/* now check associated */
		if(cmp_to->associated_attrs)
		{
			if(cmp_with->associated_attrs)
			{
				int x,y;

				ret = 1;

				for(x=0; cmp_to->associated_attrs[x]  && ret == 1; x++)
				{
					ret = 0;

					for(y=0; cmp_with->associated_attrs[y]; y++)
					{
						if(slapi_attr_types_equivalent(
							cmp_to->associated_attrs[x], 
							cmp_with->associated_attrs[y]
							))
						{
							/* matched on associated attribute */
							ret = 1;
							break;
						}
					}
				}
			}
		}
		else
		{
			/* no associated is an auto match */
			ret = 1;
		}

	}

	return ret;
}

indexEntry *index_subsys_index_shallow_dup(indexEntry *dup_this)
{
	indexEntry *ret = (indexEntry *)slapi_ch_calloc(1,sizeof(indexEntry));

	/* shallow copy - dont copy linked list pointers */
	ret->indexedAttribute = dup_this->indexedAttribute;
	ret->indexfilter = dup_this->indexfilter;
	ret->indexfilterstr = dup_this->indexfilterstr;
	ret->user_data = dup_this->user_data;
	ret->namespace_dn = dup_this->namespace_dn;
    ret->lookup_func = dup_this->lookup_func;
	ret->associated_attrs = dup_this->associated_attrs;

	return ret;
}

/* index_subsys_assign_decoder
 * ---------------------------
 * finds a decoder which will service
 * the filter if one is available and assigns
 * the decoder to the filter.  Currently this
 * function supports only simple filters, but
 * may in the future support complex filter
 * assignment (possibly including filter rewriting
 * to make more matches possible)
 *
 * populates f->alternate_decoders if more than one
 * index could deal with a filter - only filters that
 * have a match with all associated attributes of the
 * first found filter are said to match - their
 * namespaces may be different
 */
static int index_subsys_assign_decoder(Slapi_Filter *f)
{
	int ret = 0; /* always succeed */
	indexPlugin *plg;
	indexEntry *index = 0;
	indexEntry *last = 0;
		
	f->assigned_decoder = 0;

	if(!theCache)
		return 0;

	index_subsys_read_lock();

	plg = theCache->pPlugins;

	while(plg)
	{
		index = plg->indexes;

		while(index)
		{
			if(INDEX_MATCH_NONE != index_subsys_index_matches_filter(index, f))
			{
				/* we have a match, assign this decoder if not disabled */
				if(index->lookup_func)
				{
					if(!f->assigned_decoder)
					{
						f->assigned_decoder = index_subsys_index_shallow_dup(index);
						last = f->assigned_decoder;
					}
					else
					{
						/* add to alternate list - we require that they all
						 * have the same associated attributes configuration for now
						 * though they may have different namespaces
						 */
						if(index_subsys_index_matches_index(f->assigned_decoder, index))
						{
							/* add to end */
							last->list.pNext = index_subsys_index_shallow_dup(index);
							last = last->list.pNext;
						}
					}
				}
				else
				{
					/* index disabled, so we must allow another plugin to
					 * get a crack at servicing the index
					 */
					break;
				}
			}

			index = index->list.pNext;
		}

		plg = plg->list.pNext;
	}

	index_subsys_unlock();

	return ret;
}